- 陌生人匹配服务

- api微服务先通过个人的信息计算一堆参数,然后通过rpc调用match微服务
- match将这堆参数拼接成string插入redis,通过这堆参数各种各样的计算,得到不同的zset,(key为时间戳),将个人的匹配信息加入redis(key为uid,val为uid)设置超时时间作为结束匹配的标志,
- 每一个zset中都有协程在brpop,拿出来之后使用这个人的uid加锁,标识已经在匹配了,分配唯一match_id
- 继续从队列中拿出用户进行匹配,成功匹配通过长连接下发匹配成功的通知
- 这部分非常复杂,只是简单的思路,具体的后面有空再看吧
- 分布式ID生成器

- 通过提前拿号和异步检查分配的办法(每次分配之后查一下是否已经分配超过60%,如果超过了,异步开协程那下一个号段)解决mysql速度不够快的问题
- 号码一定递增,但是不一定连续(因为如果宕机,下次回复只能拿到下一个号段,无法确定上一个号段是否发完了)
- 通过负载均衡,使得同一个号码的拿取服务全都打到同一台机器上,避免了出现因为多个号段同时发号导致号码不递增的问题
- 通过一致性hash实现自动容灾和恢复,类似[[redis实现#一致性hash算法(一致性哈希)]]
- 构建主机数组,使用二分搜索搜索key对应处理的主机端口ip
- 当处理的机器更改时候(添加或者删除),向旧的机器发送删除内存对应号段的通知(这里需要确保加入的都是新启动的,内存没有号段的,这个特性依赖微服务框架的隔离机制),这里的删除不是新的机器删除,而是框架中自带的client调用端发rpc请求删除
- rpc请求拿到所有的发号机器的ip和port,一致性hash算出自己访问哪一台机器
- 检查是否和上一次请求的机器相同,如果不相同,向上一台机器发送删除的通知,如果因为下线,连接失败没关系,如果在线,会删除缓存中的号段
- 向请求的机器发送rpc请求申请发号
- hash环的计算全部都在client中,和服务端无关
- 使用的是类似锁的机制(实际上使用的是sync.map)
//从本地缓冲池获取已经设置的Segment Getter,并且调用获取设置的segment执行
//检查是否能够生产ids, 无法生成则回源获取segment
if cachedSegmentFn, ok := pool.segments.Load(key); ok {
segment := (cachedSegmentFn.(SegmentIDCacheGetter))()
if segment == nil {
logger.Errorf("Generator segment not found:%s[3]", key)
return nil
}
var isAccord bool
nextIDs, isAccord = nextIDFn(segment, 0)
//满足条件1
if isAccord {
return nextIDs
}
applyed = int64(len(nextIDs))
//重置key对应的segment fn, 否则sync.Map的LoadOrStore无法正确执行
pool.segments.Delete(key)
}
//multi-goroutine情况下并发获取的都是waitGetter,仅当唯一获取了segmentFn的Goroutine完成初始化后返回
var startTime = time.Now()
var initSegment *entity.NamespaceSegment
var wg sync.WaitGroup
wg.Add(1)
waitGetter := func() *entity.NamespaceSegment {
wg.Wait()
return initSegment
}
//only one goroutine to call fn()
//大量协程尝试写入这个sync.map
segmentGetter, loaded := pool.segments.LoadOrStore(key, SegmentIDCacheGetter(waitGetter))
if loaded {
//因为多个协程竞争导致,大部分协程到这个地方,然后进行等待,**注意,这里等待的并不是自己的wg,因为自己的wg压根没有成功存进去,等待的是成功存进去的wg,即拿到执行权限的wg**
//block here wait first getter done
segment := (segmentGetter.(SegmentIDCacheGetter))()
if segment == nil {
return nil
}
nNextIDs, isAccord := nextIDFn(segment, applyed)
if !isAccord {
return nil
}
return append(nextIDs, nNextIDs...)
}
//成功写入的协程到这部分
//Store成功,初始化Request
initSegment = segmentFn()
wrapGetter := func() *entity.NamespaceSegment {
return initSegment
}
pool.segments.Store(key, SegmentIDCacheGetter(wrapGetter))
//完成wg,通知其他协程起来
wg.Done()
////////////////////////////////////////////////
// 这部分是nextIDFn的内容,底层还是使用atomic实现的
func(segment *entity.NamespaceSegment, applyedN int64) ([]int64, bool) {
var apply = n - applyedN
//有效的ID列表, 超出保留的ID列表
var validIDs []int64
var counter *int64
if iCounter, ok := globalCounter.Load(key); !ok {
return nil, false
} else {
counter = iCounter.(*int64)
}
// 注意这里是先进行原子递增,在进行拿出号码,这样可以实现多个协程一起拿的操作
nextID := atomic.AddInt64(counter, apply)
if nextID <= segment.MaxID {
for s := nextID - apply + 1; s <= nextID; s++ {
validIDs = append(validIDs, s)
}
return validIDs, true
} else {
for s := nextID - apply + 1; s <= segment.MaxID; s++ {
validIDs = append(validIDs, s)
}
return validIDs, false
}
- 通过IP+PORT进行负载均衡,在TCP网络层面进行划分
- 在四层的基础上(没有四层就没七层)
- 将请求通过应用层协议进行区分和负载均衡