match

  • 陌生人匹配服务
  1. api微服务先通过个人的信息计算一堆参数,然后通过rpc调用match微服务
  2. match将这堆参数拼接成string插入redis,通过这堆参数各种各样的计算,得到不同的zset,(key为时间戳),将个人的匹配信息加入redis(key为uid,val为uid)设置超时时间作为结束匹配的标志,
  3. 每一个zset中都有协程在brpop,拿出来之后使用这个人的uid加锁,标识已经在匹配了,分配唯一match_id
  4. 继续从队列中拿出用户进行匹配,成功匹配通过长连接下发匹配成功的通知
  5. 这部分非常复杂,只是简单的思路,具体的后面有空再看吧

gennerator

  • 分布式ID生成器
  • 通过提前拿号和异步检查分配的办法(每次分配之后查一下是否已经分配超过60%,如果超过了,异步开协程那下一个号段)解决mysql速度不够快的问题
  • 号码一定递增,但是不一定连续(因为如果宕机,下次回复只能拿到下一个号段,无法确定上一个号段是否发完了)
  • 通过负载均衡,使得同一个号码的拿取服务全都打到同一台机器上,避免了出现因为多个号段同时发号导致号码不递增的问题
  • 通过一致性hash实现自动容灾和恢复,类似[[redis实现#一致性hash算法(一致性哈希)]]
    • 构建主机数组,使用二分搜索搜索key对应处理的主机端口ip
    • 当处理的机器更改时候(添加或者删除),向旧的机器发送删除内存对应号段的通知(这里需要确保加入的都是新启动的,内存没有号段的,这个特性依赖微服务框架的隔离机制),这里的删除不是新的机器删除,而是框架中自带的client调用端发rpc请求删除
      1. rpc请求拿到所有的发号机器的ip和port,一致性hash算出自己访问哪一台机器
      2. 检查是否和上一次请求的机器相同,如果不相同,向上一台机器发送删除的通知,如果因为下线,连接失败没关系,如果在线,会删除缓存中的号段
      3. 向请求的机器发送rpc请求申请发号
    • hash环的计算全部都在client中,和服务端无关
  • 使用的是类似锁的机制(实际上使用的是sync.map)
	//从本地缓冲池获取已经设置的Segment Getter,并且调用获取设置的segment执行
	//检查是否能够生产ids, 无法生成则回源获取segment
	if cachedSegmentFn, ok := pool.segments.Load(key); ok {
		segment := (cachedSegmentFn.(SegmentIDCacheGetter))()
		if segment == nil {
			logger.Errorf("Generator segment not found:%s[3]", key)
			return nil
		}

		var isAccord bool
		nextIDs, isAccord = nextIDFn(segment, 0)

		//满足条件1
		if isAccord {
			return nextIDs
		}

		applyed = int64(len(nextIDs))
		//重置key对应的segment fn, 否则sync.Map的LoadOrStore无法正确执行
		pool.segments.Delete(key)
	}

	//multi-goroutine情况下并发获取的都是waitGetter,仅当唯一获取了segmentFn的Goroutine完成初始化后返回
	var startTime = time.Now()
	var initSegment *entity.NamespaceSegment

	var wg sync.WaitGroup
	wg.Add(1)
	waitGetter := func() *entity.NamespaceSegment {
		wg.Wait()
		return initSegment
	}

	//only one goroutine to call fn()
	//大量协程尝试写入这个sync.map
	segmentGetter, loaded := pool.segments.LoadOrStore(key, SegmentIDCacheGetter(waitGetter))
	if loaded {
		//因为多个协程竞争导致,大部分协程到这个地方,然后进行等待,**注意,这里等待的并不是自己的wg,因为自己的wg压根没有成功存进去,等待的是成功存进去的wg,即拿到执行权限的wg**
		//block here wait first getter done
		segment := (segmentGetter.(SegmentIDCacheGetter))()
		if segment == nil {
			return nil
		}

		nNextIDs, isAccord := nextIDFn(segment, applyed)
		if !isAccord {
			return nil
		}

		return append(nextIDs, nNextIDs...)
	}

	//成功写入的协程到这部分
	//Store成功,初始化Request
	initSegment = segmentFn()
	wrapGetter := func() *entity.NamespaceSegment {
		return initSegment
	}

	pool.segments.Store(key, SegmentIDCacheGetter(wrapGetter))
	//完成wg,通知其他协程起来
	wg.Done()

////////////////////////////////////////////////
// 这部分是nextIDFn的内容,底层还是使用atomic实现的
func(segment *entity.NamespaceSegment, applyedN int64) ([]int64, bool) {
		var apply = n - applyedN
		//有效的ID列表, 超出保留的ID列表
		var validIDs []int64
		var counter *int64
		if iCounter, ok := globalCounter.Load(key); !ok {
			return nil, false
		} else {
			counter = iCounter.(*int64)
		}

		// 注意这里是先进行原子递增,在进行拿出号码,这样可以实现多个协程一起拿的操作
		nextID := atomic.AddInt64(counter, apply)
		if nextID <= segment.MaxID {
			for s := nextID - apply + 1; s <= nextID; s++ {
				validIDs = append(validIDs, s)
			}
			return validIDs, true
		} else {
			for s := nextID - apply + 1; s <= segment.MaxID; s++ {
				validIDs = append(validIDs, s)
			}
			return validIDs, false
		}

负载均衡

类型

  • 层次指的是对应网络模型的哪个层次

四层负载均衡

  1. 通过IP+PORT进行负载均衡,在TCP网络层面进行划分

七层负载均衡(nginx)

  1. 在四层的基础上(没有四层就没七层)
  2. 将请求通过应用层协议进行区分和负载均衡