tira
设计图
架构图
时序图
唯一
- 客户端生成ID(通过时间戳作为msgID)
- 服务端使用ID+TO+FROM 唯一标识信息setnx处理幂等性
必达
- 在IM部分的重试队列保证必达
- 服务端回应fin,客户端回应ack,标识受到信息
- 增量拉,避免某条消息丢失
有序
- seqID 递增机制(不连续但是一定自增),号段模式
- 防止seqID号段多次消费使用hash负载均衡使得所有的相同ID落在同一台实例上
- 通过只下发id不下发消息体保证
具体流程
客户端连接
-
请求先打到AWS四层负载均衡器上,然后进入服务器中负责网关的微服务(这部分可以使用nginx代替),微服务通过负载均衡策略将长连接请求打到nginx上
-
和edge-connect建立TCP长连接,生成唯一的tcp-id(自己edge内唯一即可),最好不用socketid,因为socketid复用容易导致出问题,可以维护自增变量(atom)
edge通过edge-id构建redis的消息队列,每个edge都有自己的队列 edge自己有个edge-id(connect-id)用于标识自己是哪个edge
-
edge生成3个协程,一个用于上游消息的阻塞接受,一个用于下行消息BRpop从redis阻塞读取读取到下行消息后放到downstreamchan中等待其他协程处理,一个用于5s心跳包发送保活(每次有消息包来都会重置)
-
将edge-id和tcp-id,以及连接的地址信息加入msg字段向route层发送
-
客户端向服务端发送auth-hello
-
route层先把body中的jwt token拿出来解析,获取uid,通过uid,val为hash,hashkey为resource,在redis检查是否连接已经建立,如果建立下发流冲突断开旧的连接,同时设置过期时间
-
服务端回应auth-reply,参考加密系统 > tira-im加密
-
验证成功后将通过uid,val为hash,hashkey为resource,插入redis,hash-val为tcp-id和edge-id,同时插入redis将tcp-id+edge-id作为key插入redis,val为个人信息和AES密钥,用于route层查找用户
edge全程不感知用户内容,仅仅作为维持长连接工具
数据包发送
-
数据包经过AWS四层负载均衡器打到edge-connect上
-
route先从session拿到对应tcp-id和edge-id对应客户端uid和uid对应的AES-key,然后AES解密得到原始报文,同时更新key的存活时间,应用层保活
这部分session中需要存储两部分的内容,一个是key为uid,val为tcp-id,edge-id等信息,用于保活,另一个是key为tcp-id+edge-id,val为aes密钥,用户获取这个tcp连接的密钥
-
route根据cmd字段分配不同的process同步处理,处理结束之后如果有应答,使用雪花算法生成id组装数据包,马上返回edge层面,edge将其加入downstream chan中下发消息,这部分的逻辑也可以改成插入nsq后立刻返回,inte拿出就返回ok(因为失败毕竟是少数情况,重新放回不会影响太多的性能,避免大量消息因为没有ack而在等待,最重要的是,需要修改字段标记处理阶段和重试次数),如果处理失败修改继续放入队列延迟重试(这部分可能造成消息的积压),向connect中推送也使用(这个一般不涉及处理连接失败和重试,而且这部分消息丢失的代价较小,因此不一定需要使用消息队列),route将数据包放到integration层立刻返回
-
integration拿到消息后进入复杂处理消息逻辑
- 检查消息合法性,是否被拉黑之类,被拉黑先客户端下行系统消息发送失败
- 通过sendid+recvid+clientid作为key,setnx插入redis,作为幂等性判断若插入失败证明信息已经发送,数据库总结 > 幂等性处理,
- 通过gennerator生成seqid(保证有序递增,不要求连续tira-others > gennerator,以个人维度发号)
- 消息落mysql数据库(这里使用to_id作为hash落库,查询自己发的消息麻烦,因此这个模式适合于android自带记录的私聊),这里使用private_key拼send_id作为private_key防止冲突
- 消息落redis,缓存时间为5分钟(这里没有进行判错校验,可能导致重复落mysql库)
- 加入redis的zset队列,seqid为key和val(没有进行判错校验,可能导致zset重复插入一直失败)
- zset队列相当于helper中的收件箱,个人和所有人的对话收件箱都在这里
- 这里注意:如果zset找不到这个key或者查的比里面最小的还小(比如出现宕机现象),需要从数据库拉取之后放入redis缓存容灾
- 获取redis zset最新的seqid,构建syncpull包,先to的所有在线resource发送syncpull,要求其下拉消息
-
syncpull包到route层,查询包中的uid对应的tcpid和stream-id,将消息塞到redis的消息队列中
-
edge从redis拿到消息封装之后塞到downstreamchan中等待发送
消息接受
- 客户端收到syncpull要求之后,检查自己的seqid和服务端发过来的seqid大小,如何自己的比较小,就把自己的seqid作为起始seqid发送syncpull请求
- edge层面直接转发到route层面
- route层面转发到integration层面
- integration层面通过zset找到比客户端seqid大的所有seqid,先在redis中找消息,找不到就去mysql找,找到之后将消息一条一条sendmessage返回到route层面,最后加上一个ack标识消息发送结束(否则客户端不知道消息什么时候发送终止)
- route将每个消息加密之后一个一个塞到redis队列中(每个TCP包大小有限,一次可能发不完,因此一个一个发)
- edge继续塞到downstreamchan等待发送
- 客户端收到消息之后,如果读取之后,上行ack携带读到的seqid
- edge传输到route,继续传输到integration,通过chan异步将redis收件箱中比客户端发的seqid小的都删除(删除收件箱)
数据包
-
edge层面packer头固定长度,先接收一个header,根据长度继续接受packer的body,header头包括版本号和长度等信息
-
除了开始的auth-hello的packer不加密,其他packer的body是加密的,密钥为serran+cliran+sessionid的md5结果
-
解决粘包问题关键,包括tcp中的数据也是这样解决粘包的问题
-
加密后的内容中存在content_type以及body,其中body是使用[]byte类型,根据不同大的type映射到不同的结构体(使用protobuf)
-
具体流程:
- edge首先接受header,根据payloadSize接受,并且将接收到的数据映射到Payload中
- edge将包上传到route的时候,封装成UpstreamMessage,使用包头中的type以及Payload的数据组装
var upstreamPacketWrapper = proto.UpstreamMessage{ ID: packet.Header().ID(), ProtocolVersion: packet.Header().ProtocolVersion(), Command: proto.NewCommandWithData(packet.Header().Command(), packet.Payload().Data()), Resource: packet.Header().Resource(), RemoteAddr: remoteAddr, LocalAddr: localAddr, StreamID: streamID, ConnectorID: connectorID,
}
3. route拿到之后根据type进行分发,并且使用protobuf反射解析到结构体 4. route分发到的不同的微服务
// edge-connector
// Header 包头
type Header struct {
//PacketID 用户空间唯一自增(版本号) 64bits
id uint64 `string:"id"`
//协议版本号 8bits
protoVersion uint8 `string:"proto_version"`
//指令枚举值 8bits
cmd uint32 `string:"cmd"`
//资源标记符:同一组资源需要处理流冲突 8bits
resource uint8 `string:"resource"`
//Payload大小, 最大支持2^31 byte长度数据 32bits
payloadSize uint32 `string:"payload_size"`
}
// Payload 包体
type Payload struct {
size uint32
data []byte
}
// coa-router-service
type UpstreamMessage struct {
//PacketID 用户空间唯一自增(版本号)
ID uint64 `json:"id" msgpack:"id" string:"id"`
//协议版本号
ProtocolVersion protocol.ProtocolVersion `json:"protocol_version" msgpack:"protocol_version" string:"protocol_version"`
//资源
Resource protocol.ClientResource `json:"resource" msgpack:"resource" string:"resource"`
//该包关联的远程以及本地地址,二元组组成唯一的TCP连接
RemoteAddr string `json:"remote_addr" msgpack:"remote_addr" string:"remote_addr"`
LocalAddr string `json:"local_addr" msgpack:"local_addr" string:"local_addr"`
//连接流ID
StreamID uint64 `json:"stream_id" msgpack:"stream_id" string:"stream_id"`
//网关ID
ConnectorID uint64 `json:"connector_id" msgpack:"connector_id" string:"connector_id"`
Command *Command `json:"command" msgpack:"command" string:"command"`
}
type Command struct {
Type protocol.CommandType `json:"type" msgpack:"type" string:"type"`
Data []byte `json:"data" msgpack:"data" string:"data"`
}
难点
- 如何让保证消息的唯一性,客户端有重试机制,大量消息出现时不会出现id重复的问题,宕机重启时候不会出现重复id
- 如何保证消息的有序性,可能出现下行消息的乱序,客户端无法主动解决这个问题,同一个群大量人发送消息时候消息处理的有序
- 如何让保证消息的必达性,弱网情况下下行的消息可能出现丢失错漏的现象,逻辑处理时候可能出现数据库宕机,程序宕机的情况,如何保证必达性
- 如何在多端登录时候下同步消息
- 如何保证消息的安全和隐私性
QA
为什么不使用websocket
- 要求信息加密性要求比较高,实时性要求比较高,因此使用简化版的SSL/TLS达到自协议加密作用加密系统
- 免去http协议升级过程,直接使用TCP连接,更加快速和便捷
- ws在nginx解密,未必到达内部服务可信层
- 公司之前有成熟的设计方案
为什么不使用主键作为seq
- 因为未来设计中群聊功能需要发号.无法使用id作为seq
- 但是这部分会出现群聊消息爆炸导致重要的个人消息拉取慢的问题(因为是一条线,必须先拉去前面的消息才能拉取后面的消息)
为什么不用websocket的保活,需要两个层面保活,如何实现
- session其他服务也需要用,业务层保活必不可少,这两个层面是不一样的,一个代表tcp在线,一个代表用户在线,并不能混用
- session保活依赖redis的自动过期,导致发现的时间长,tcp本地保活发现的时间更加快速,避免无效连接占用堆积的情况
- 缺点是每一个用户都要维护定时器,系统开销增加
//检查当前连接是否存活
//连接存活的机制:
//1) 如果有数据IO,则每一次上行以及下行均会更新activeTime,在30s过期内更新则表示存活
//2) 客户端定期的ping探测机制,能保证至少30s有一次activeTime更新(实际idleTimeout=pingIntervalTime+RT时间)
func (conn *Conn) keepAliveSchedule() {
idleTimeout := conn.srv.config.idleTimeout + conn.srv.config.roundTime
conn.checkTimer = time.NewTicker(idleTimeout)
go func() {
defer conn.checkTimer.Stop()
for {
select {
case <-conn.exitChan:
return
case tm := <-conn.checkTimer.C:
diff := tm.Sub(conn.activeTime)
if diff >= idleTimeout {
logger.Infof("conn[%d] remove staled connection after:%s", conn.ID, idleTimeout.String())
metric.EdgeConnectorCoreMetricReporter.PassiveDisConnCounter()
conn.setState(StateClosed)
conn.Exit()
return
}
}
}
}()
}
- connect层的保活直至维护长连接的状态,session的保活是为了维护用户的登录状态
- wesocket自带保活有以下缺点
- tcp 自带的 keepalive 时间非常久,一般情况业务是不能依赖的,配置在操作系统
- 中间还有个NGINX反向代理服务器,在一段时间内没收到数据也会自动断开连接
- tcp因为是虚拟的,导致没有数据传送时候掉线判断困难,ping/pong让tcp掉线响应更快
- 运营商的NAT,超过5mins没有消息,就会清除NAT映射,强行掐端长连接
IM中读扩散和写扩散的优缺点
读扩散
- 读扩散,以会话为单位,消息只存一份,群和会话作为收件箱,群收件箱很多
- 节约存储空间
- 但是每个群和会话都需要单独的序列号
- 用户需要储存对话列表,根据对话id拉取消息
- 如果是feed流面临合并麻烦的问题
写扩散
- 写扩散,以个人为单位发号,消息发给每个人的收件箱,
- feed流简单,直接拉展示
- 储存空间浪费,以个人作为维度发号
- 实时性很好,但是容易出现'写风暴'的现象
只下发id的缺点是什么
- 多次冗余的请求,浪费带宽,从两次变为了3次
- 延时增加,在聊天频率很高的情况下,本来只需要下发消息体一次,现在变为下发id,上行请求,下行body,特别是在群聊情况下容易混乱,一次请求没有结束,第二个又来了
- 增大了内存的消耗,必须引入缓存的机制(因为异步化)
- 如果是http请求的话可能出现http的body过大,导致响应时间更加长的情况(应该通过http的压缩)
只下发id的优点是什么,为什么不直接下发消息体
- 有序性的核心保证,拉取返回都是按顺序返回(因为有可能推送顺序问题,丢包的问题)
- 一定程度上保证了必达性,因为即使中间缺失也可以拿到完整正确的
- 一定程度上减轻了写风暴(即群聊的消息大量下发)的压力,减轻IM系统的压力
- 实现异步化拉取,上线拉和在线推的逻辑复用
- 将http和im模块独立,使得IM长连接系统压力减少
- 不需要ack了,pull成为ack,批量下行可以进行压缩,减少流量损耗
如何实现id和消息体两者相结合
双链机制
- 可以改进成为推送body,携带上一条的msg_id,客户端判断上一条是否是,如果是,那么接受这条,更新最新的msg_id,如果不是,那么上行拉取请求,从自己的的msg_id开始拉取,这样可以网络ok直接下发body,减少交互流程,网络不ok恢复成原来的,也可以保证通过双链保证有序性和必达性,这个方法有缺陷,需要查询上一条消息的msg_id,但是在并发写的情况下会出现读到的不一定是最新数据的读写一致性问题
微信机制
推送模型
- 下发ID和下发data两种模式相结合的形式
- 下发模式分为notify和data两种模式,data为数据体模式,notify为下行ID的形势
- data模式明显比notify模式快,但是可能出现消息丢失的现象(除非使用双链,但是双链重试使得效率降低),因此微信采取的data模式时候是在服务端转换的(接入层更改),这部分意味着由服务器保存一份用户接受到了的最新id,然后通过这个id查询你需要的消息体,打包下发
- Data的状态需由客户端触发形成,触发的时机为客户端主动上服务器来做过一次消息收取的请求。由于在Data状态下需由服务器的ConnectSvr主动去ReceiveSvr获取增量消息,服务器必须知道客户端此时的sequence才能做到通过sequence的比较增量下发消息。所以在进入NotifyData状态前,需等待客户端主动做一次消息收取的请求将此时客户端的sequence保存在ConnectSvr中。
- 在Data状态下,客户端必须对服务器下发的每一个Data进行Ack,并且服务器在下发了Data未收到Ack的这段时间内需关闭Data状态(即在图2的2.7和2.8步骤之间不能再做NotifyData下发)
- 直接推送更加快,但是可能出现乱序和丢失的问题,因此需要严格控制下发body的触发条件
如何实现多端接受消息
- 当自己下发消息的时候,将消息的id和to_uid下发,其他端就通过id和to_uid拉取自己的消息体(to_uid是必须的,因为通过to_uid分表),如果是helper类型的只需要下发groupid和msgid
当网络频繁断线(坐地铁怎么处理)
- 断开后不立刻删除状态消息,标记状态为断线,启动超时删除,短时间重连这更新为新连接
如何解决大量定时器占用大量内存资源造成消息卡顿问题
- 传统的计时器使用二叉堆实现,存取的复杂度是log(n),可以考虑使用时间轮实现,计时系统
这个项目的指标
- 日活10W,每个人平均2s就会有一个上行发消息(发消息,拉取的请求,心跳保活的请求综合),日活10W用户每秒约5000请求,峰值月为3倍,QPS大约在15000左右
- 发送的R99大约在100ms(通过connect层的rpc和route层的rpc,session的rpc共同作用,包括逻辑处理和加解密,20ms消息队列延迟),接受到的延迟大约为200ms(测试方法为自己想自己发送请求,两者的时间戳相减,再减去服务器携带的时间戳),时间主要用于消息在消息队列中的延迟(20ms),核心逻辑处理(10ms(mysql插入(1ms))),50ms的双向延迟
- 单机最大并发数大约在5w左右
最大连接数 = (内存大小/单个连接占用内存) * 系统负载因子
,如果不够会触发k8s的扩容机制
项目的性能瓶颈在哪里
- 微服务的层层调用(约60ms的延迟)
- 消息在消息队列中的延迟(两个共约40ms延迟)
- mysql的插入,核心层面的逻辑处理(10ms左右)
如何实现消息的撤回和审核
- 下发一条新消息,这条消息指向需要撤回的消息,前端收到之后自动撤回消息(通常使用这种)
- 快速,不需要修改消息的原有状态(这部分需要操作缓存和刷盘)
- 不需要更改现有的逻辑代码,代码入侵程度小,实现简单,逻辑复用(只需要发送消息)
- 需要消耗额外的空间去存储一条消息(磁盘是不值钱的,这点损耗可以接受,而且还能记录撤回时间等有用信息)
- 消息本体没有改变,如果是通过接口api请求依旧可以看到消息体
- 这种底层使用leveldb是最好的,这样可以直接set原来的key然后下发消息
- 删除缓存,修改数据库中消息的状态,下发websocket通知在线用户,离线用户拉到的就是已经修改状态了的
- 不需要消耗额外空间储存
- 消息本体改变,可以直接进行过滤,返回的消息内容不可见
- 慢,需要删除消息缓存和修改数据库(通常撤回的消息都是热点消息,因此比较慢),大新系统中不推荐
如何实现消息的已读未读
- 使用一张表每条消息插入一行,然后ack的消息修改对应消息的状态,每个line记录unreadid和readid,messageid(感觉飞书使用的是这种),客户端拉取消息后在拉取消息对应的已读未读状态,需要额外拉取
- 浪费空间,每条消息都对应一行
- 修改方便,可以实现每条消息的每个人识别已读未读,不会出现后面进群的人自动已读前面的消息的情况
- 修改速度慢,如果一个人连续读了100条消息,需要修改这100条的状态,在大群中更加明显
- 因为是另一张表,堆原有业务入侵小
- 需要额外拉取状态
- 群人数没影响
- 使用一条新消息,状态修改,其他不变,指向原消息,这样拉取到这条的时候直接替换原来的消息
- 可以和撤回消息的通道复用,更加简洁
- 极度浪费储存,一个ack对应一条消息(当然可以多个ack对应一个消息但是依旧非常浪费),这里如果底层存储使用的是leveldb,完全可以等到merge时候合并,参考 LevelDB底层
- 不需要额外拉取
- 下发已读的通知,也不需要上线拉取
- 每一条都需要更改一次,和上面一样的问题
- 记录用户的最大ack值,客户端拉取群所有人的最大ack,比较这个group的msgid和ack比较,
- 只能用于group维度发号的情况,不能用于个人收件箱的情况,个人的msgid不是一样的,无法使用,使得使用的范围小
- 当群里人很多的时候,计算和拉取都很麻烦
- 直接在message字段存储加入is_read字段
- 只能用与私聊,群聊人员变动无法使用
- 设计和使用简单
- 如果是最后设计了来说
- 如果是个人会话,个人收件箱+is_read/新消息
- 群会话 群收件箱+记录最大ack值
个人维度和会话维度发号优缺点
个人维度
- 可以实现个人的收件箱,
- 消息储存的冗余,每个群个人的收件箱都需要有一份msg_id,对于大新群聊来说每条消息大量插入每个人的收件箱使得其非常慢(每次插入是log(n))
- 无视会话的数量,每次拉取消息只需要拉取自己的收件箱,实现无感知
- 统计维度消息数量简单,适合用于私聊,不适合用于群聊,私聊不用生成group,可以直接发送,更加简单
- 必须设计新的发号器,用这个做群聊出现风暴的问题
- 拉取历史记录麻烦,自己发的消息查出来需要应用手动合并
会话维度
- 已读未读的实现更加简单,拉取历史消息更加简单(不需要),传播速度更加快,因为只需要插入一次zset
- 每个群维护一个发号器,每个人拉取未读消息麻烦,未读消息数量统计麻烦,适合用于群聊,不适合用于私聊,因为每个人都需要维护一个group,数量变得非常大
- 可以使用mysql id作为发号,需要维护每个人对应每个group的未读消息
- 其实可以结合使用,个人私聊使用个人收件箱,群聊使用群收件箱
微信方案
消息模型
- ack的在进入界面,退出界面,在界面停留几秒都会发送ack
私聊
- 私聊以个人维度发号,和tira中的私聊设计类似
- 这样可以很方便做管理(只需要拉一条时间线上的消息)
- 微信这部分的私聊消息很可能压根没有做MQ,估计为了保证不会丢消息,一定是写消息成功落库之后才会返回前端成功.收件箱为了持久化存储,使用的不是redis这种内存数据库,估计是类似leveldb这种
群聊
实际上微信基本上都是用户一个inbox,使得不支持大群,而且不好做已读未读
- 群聊使用群聊收件箱的机制,和help的机制一样,维护群聊收件箱
- 群聊维度使用群维度发号,以群id维度发号,类似读扩散
helper中为什么需要两个hash而不是一个
- 一个用于记录ack(做已读未读),一个用于记录未读群聊,如果使用一个那么第二个hash就不能删除项,每次拉取都需要遍历hash中所有的session(即群聊),非常低效很慢,分开可以优化未读信息的通知
如何实现容灾
- redis使用集群模式加主从架构加哨兵监控,崩溃时候可以自动切换成为slave的redis成为master
- mysql使用一主一从的双机主备高可用,如果master挂了slave自动成为masterMysql底层原理 > keepalive
- 微服务使用k8s统一管理,如果出现负载过大,会生成新的docker微服务
- 使用mira和普罗米修斯和grafana监控状态,警告接入飞书通知,通过grafana查看请求QPS(记录消息数,成功数,rpc调用平均延时)
- 如果逻辑层连接数据库一直失败(网络波动),尝试次数过多后会将数据写入文件并发出预警(这部分可以变成使用消息队列,只有落库成功才返回处理成功,否则让数据库进行重试)
- 限流器,避免出现服务的雪崩效应(A崩了,B承受A+B的请求也崩了),尝试失败后前端直接显示红点或者进行重试
- redis崩了从mysql拿数据的时候异步缓存到redis(这里可能出现刚上线请求压力大的问题,容易产生缓存雪崩,可以上限流或者提前缓存),这里的缓存是缓存收件箱
长连接维护占用大量的资源,如何优化
- k8s水平扩展增加实例
- 拆分im和http成为两个部分,减低im部分的压力
哪部分架构自己设计的,开发时间人员
- 路由层的具体逻辑自己设计的,参考了openim,goim等
- gennerator自己实现的,设计参考了美团leaf分布式id生成器
- 微服务的划分是参考了之前的im项目
- 一共3+1个人,花了两个月多一点
如何实现优雅退出/升级
- 微服务框架中自己携带有退出函数,接受退出信号
- 如果是 http 服务, 直接通过服务注册平台下线实例, 然后上线新实例解决
- 如果是长连接的部分
- 当出现退出信号的时候,查找所有的长连接,下发特定的通知然后断开长连接,清理长连接部分,客户端收到这个不会提示
- 高级一点的方式是通过 fork + exec + 信号的技术, 通过发送信号升级, 通过 fork+ exec新的程序实现升级, 老的直接 exit (利用的是 fork之后可以基层 socket的特性), 但是普遍比较复杂 + 如果是 k8s + pod 的形式难以执行 ,进程级别倒是适用
- k8s创建新的pod之后发送信号给旧的pod,旧的pod,会有一个最大退出时间的限制(超过直接kill),同理知道所有的pod全部被替换
如果出现错误包格式如何处理
- 直接掐断这个tcp连接,关闭改tcp连接
如何提高系统的高并发及可用性
- 可用性高并发核心还是分布式架构 > 保证高可用性的方法
- 水平扩展微服务提高承载能力
- 使用消息队列实现异步和解耦削峰
- 服务降级的兜底策略
- 数据库的容灾备份
产品迭代过程
- 第一版使用sync_pull打包成一个结构体一起发送
- 自己指定协议的包长度有限制,一次性推送未必足够长,因此采用FIN和LV方法
- 使用redis作为List缓冲,如果一次性发造成失去redis缓冲的作用
- 因为edge_connect同时服务多个用户,太长的包容易让connect长期为这个连接工作
- pull代码逻辑复用
- 因为改成一个一个发送,需要fin告诉客户端发送结束,也需要lv让服务端删除缓存,没有fin包,客户端无法知道什么时候发送结束,无法合适的时机发出lv
- 第一版使用uid作为streamID
- 开始没问题,后来出现多个设备登陆问题出现二元组相同的情况,不支持多端,因此使用雪花ID生成唯一ID或者号段ID,保证连接唯一性
tira-room
流程
- 客户端进入房间之前首先通过https发送请求,携带uid和roomid,后端处理之后返回token(这里也可能是sessionid)
- 客户端和edge建立tcp连接,发送hello包,使用公钥加密(token,cliran),route解密之后下行serran,返回,并且下行event到edge传递private_key以及uid,roomid
- edge判断是否是本地没有订阅的room,如果是就订阅room
- 上行消息发送的时候在edge进行解密,塞到c2r的chan之后立即返回达到异步处理
- c2r只负责拿到roomid作为消息的seq,并且把包装好的消息下行到route,route将这个消息发布到redis的channel(这个channel是以room为维度的,每个room都会有自己的channel)
- edge将这个channel下面的所有连接拿出来依次下发
QA
为什么edge保存privatekey
- 这里和私聊不同的地方在于不同人有不同的private_key,这样route发送一条消息就需要加密非常多次,性能上不可接受,因此在edge下发前自行加解密更加快和方便
为什么不是一个room一个privatekey
- 理论上可以,甚至我觉得在这个聊天室场景下更加好,没有用是考虑了和私聊同样的加解密步骤,改动小,以及安全性的考虑(因为room的privatekey是公共的)
- 如果是一个room一个privatekey,在生成room的时候生成放在session中,可以直接在下行的时候在route加密,这样第一更加贴合私聊的下行方案,更重要的是edge可以做到不感知业务,其实更加好
为什么使用订阅者模型而不是list
- 大量消息和大量订阅者,而且可以忍受消息一定程度的丢失,订阅模型更加合适,如果是list,无法实现消费多次多个edge拿消息
- 参考数据库总结 > redis做消息队列的几种方式以及缺点
helper
群聊实现
c2c翻版
- 使用收件箱逻辑,推送到,个人收件箱,但是检测type为group则在其他表查
- 依旧通过写扩散解决问题
help版本
- 业务上回避generator,更加简洁(但是这样导致做冷热分离很麻烦)
- 通过读写结合来实现
具体流程
- 客户端建立长连接之后,通过http发送上行请求,handle层简单判断之后放到service层
- 进入service复杂逻辑
- 幂等性校验
- 落mysql库获取seqID
- 加入redis缓存
- 加入group的zset(用于记录群里面的消息ID)
- 查询群成员
- 给除了自己以外的所有群成员添加收件箱(收件箱为hashmap结构,key为groupID,val为最新的非自己发送的msgID)
- 获取在线人员的名单
- 通知在线(除了自己)的人,通过edge长连接下发消息
- 异步审核和异步下发微信通知
区别
- 从个人收件箱转为群收件箱,减少了消息的冗余,个人的新消息通知使用个人的redis hash,维护客户最新的ack和最新的消息msgid,通过对比两者的值决定自己是否有新的消息,无法维护客户端最近的ack和最新的消息,缺点是取法具体显示多少条消息未读,优点是拉取历史消息更加方便(因为是基于群维度),而且因为只需要插入一次zset传播速度更加快
- 使用mysqlid作为seqid,不需要generator,因为消息以群维度生成和发送,无上面问题
- 将上行消息http化,实现逻辑更加轻量化,读扩散模型,tira为写扩散模型
- 长连接占用大量内存资源,IO密集型,因此将IM和http拆分成两个系统
产品迭代过程
自己发消息的问题
- 一开始使用set作为收件箱,保存未读group,发现自己发的消息出问题
- 如果im下发,那么会出现自己的消息是未读消息
- 如果不下发,因为拿到的msgid不是最新的(没有自己的msgid),无法ack
解决
- 使用hash作为收件箱,自己的消息不写收件箱,始终使用不是自己发的最新的msgid作为收件箱group的值
等待改进
- 消息可以类似openIM做冷热分离,避免mysql压力太大,消息的分级存储,14天内的消息可以直接存在leveldb中(毕竟消息的写多,读大多数是缓存读),或者mongodb中(IM系统几乎不涉及事物特性)
- 使用消息队列代替chan和redis的list,保证消息的进一步必达性,如果真的添加消息队列nsq,逻辑层从nsq拿出来后进行处理,拿出就返回ok(因为失败毕竟是少数情况,重新放回不会影响太多的性能,避免大量消息因为没有ack而在等待,最重要的是,需要修改字段标记处理阶段和重试次数),如果处理失败修改继续放入队列延迟重试(这部分可能造成消息的积压),向connect中推送也使用(这个一般不涉及处理连接失败和重试,而且这部分消息丢失的代价较小,因此不一定需要使用消息队列)
- 消息的必达性还是不能真正保证,最好是直接先落库才返回成功
- 没有做已读和未读(可以通过个人收件箱的hash存储自己读到的和最新的来实现)
- 无法确定每个group有多少条未读的消息(可以通过个人收件箱的hash存储自己读到的和最新的来实现)
- 客户端发送消息的顺序可能和服务端处理接收到的顺序不一样,导致接收方乱序(可以通过客户端的client_msg_id递增,服务端保存上一条client_msg_id,如果发现不匹配就返回失败,类似双链的机制,保证消息形成一个链式结构,但是还是无法解决服务端异步发id可能混乱,这里可以考虑将幂等性处理和发消息id放到前面(这两个都不怎么耗时)或者通过滑动窗口的形式,一次性处理多条消息(按照一定顺序),缺点是通讯重试次数太多,性能差)
- 可以改进成为推送body,携带上一条的msg_id,客户端判断上一条是否是,如果是,那么接受这条,更新最新的msg_id,如果不是,那么上行拉取请求,从自己的的msg_id开始拉取,这样可以网络ok直接下发body,减少交互流程,网络不ok恢复成原来的,也可以保证通过双链保证有序性和必达性,如果是直接下发body模式,这个逻辑也可以应用在下行双链保证有序性(每条消息携带上一条消息的id) , 这个的问题是怎么确定上一条 id 是多少 , 除非能保证连续绝对递增
- 没有做限流,太高的qps容易使得系统的消息处理时间大大延长
- 没有做系统的容灾,redis挂了使得未读信息通知消失,mysql挂了会使得消息发送失败当客户端以为成功的事故
- 没有做弱网情况下的优化
- 幂等性应该做双重幂等,redis+数据库private_key设置
- tira-im群聊的扩展性很差,因为按照这个模型,群聊1000个人就要发1000个号,插入1000次zset收件箱,导致写扩散的现象非常明显
- push 实例扩容时候一致性hash 没办法马上让其负载均衡, 很容易出现刚来的负载比较低的问题(因为 tcp长连接没办法直接转移)
参考
- http://www.52im.net/forum.php?mod=viewthread&tid=3631&highlight=%CE%A2%D0%C5
- https://juejin.cn/post/7070290856967667742