- 小型群聊和普通聊天差不多,只是标记群聊标签,都是写扩散
- 分配seq的方法为使用redis的incr 分布式ID生成 > Redis
- 上行和下行都使用了kafka消息队列的机制
- kafka参考消息队列
- 使用websocket作为长连接通道
- 客户端通过webSocket发送消息到msg_gateway
- msg_gateway通过gRPC调用chat的 UserSendMsg() 发送消息
- chat服务主要是本地生成唯一消息ID(去重)和发送时间
- 然后投递到Kafka,等待所有Kafka的Slave都收到消息后判断发送成,gRPC返回
- 给客户端回复ACK,携带错误码和服务端生成的MsgID等
- transfer中的消费组mysql消费到2条消息(发送者的发件箱、接收者的收件箱)
- 持久化到mysql中全量存储,主要是应对后台分析、审计等需求,客户端是从mongodb中拉取的(拉取后删除),这里和微信逻辑类似。微信号称不在服务器存储数据,所以你用微信登录PC端时,你会发现刚刚手机上的消息在PC上怎么看不到?要么是PC端没有pull的过程,要么是离线消息只针对APP端,PC端拉不到。
- 同理,transfer中消费组mongodb消费到2条消息
- 调用redis的incr,递增用户的消息序号,key格式为:"REDIS_USER_INCR_SEQ: " + UserID,所以是用户范围内递增,因为本身用户只有一个收件箱,没毛病。
- 插入mongodb中的chat collection
- 优先通过gRPC调用pusher进行推送,否则走Kafka,通过Pusher消费的方式推送
- pusher也同样通过gRPC调用msg_gateway的MsgToUser推送消息
- 通过websocket推送
- 用户b上线的时候,通过pull从mongodb中拉取离线消息(成功后会从mongodb中删除)
- 通过websocket创建长连接,安全性依赖https,没有自定义加密,使用json的[]byte数组结构体接收,gate微服务判断包的类型后发送到rpc微服务
- rpc微服务会将消息使用protobuf格式化后推送进入消息队列中(如果是通知类的消息且在线直接在线),并返回成功(并不会保证落库成功之后才进行返回)
- 从消息队列中拿出有两个消费组分别消费,一个负责mysql的直接落库(如果设置了的话)
- 另外一一个拿出来首先经过一大堆内部的chan缓冲
- 先向redis请求seq,使用redis的incr获取seq
- 插入mongo数据持久存储
- 通过websocket推送消息体(因为这部分的可以保证所有的消息seq连续递增,因此可以保证有序性)
- 上线的时候和tira-im 类似,都是通过seq和redis+mongo拉取消息
- 启动的时候根据mongo中的最大seq初始化redis中的seq
- mysql,mongodb会分别消费这个topic,一条消息会被消费多次,首先通过redis的自增id,然后将消息体储存到个人的mongodb中
- 使用push模型和个人收件箱(个人和小群),消息堆直接推送到客户端
- 大群使用类似helper-im 使用群收件箱,这个小群和大群似乎是创建群聊的时候设置的而不是后续变化的(这个感觉理论上也很难变化)
- redis的原子自增,保证seq不重不漏
- 幂等性保证,依赖kafka的特性(存疑)
- 实际上这个架构还是可能导致消息的丢失的,因为没有确保落库成功才ack到kafka
- redis生成的seq保证自增保证有序性,这里的redis扩展的时候可能需要上一致性hash的方法
- 似乎mongodb承担了收件箱排序的功能,和缓存的功能,没有进行redis缓存
- push模型,在线直接推body,省略上行拉取的过程
- 从kafka拿出来后直接标记ok,压根没有判断失败后重试的过程,只打了个日志,这样如果mysql断开连接落库失败,会导致消息的丢失
- 依赖redis的原子incr,如果出现宕机,需要从mongo中恢复最大的序号,redis可能成为一个瓶颈