1. 小型群聊和普通聊天差不多,只是标记群聊标签,都是写扩散
  2. 分配seq的方法为使用redis的incr 分布式ID生成 > Redis
  3. 上行和下行都使用了kafka消息队列的机制
  4. kafka参考消息队列
  5. 使用websocket作为长连接通道

详细流程

  1. 客户端通过webSocket发送消息到msg_gateway
  2. msg_gateway通过gRPC调用chat的 UserSendMsg() 发送消息
  3. chat服务主要是本地生成唯一消息ID(去重)和发送时间
  4. 然后投递到Kafka,等待所有Kafka的Slave都收到消息后判断发送成,gRPC返回
  5. 给客户端回复ACK,携带错误码和服务端生成的MsgID等
  6. transfer中的消费组mysql消费到2条消息(发送者的发件箱、接收者的收件箱)
  7. 持久化到mysql中全量存储,主要是应对后台分析、审计等需求,客户端是从mongodb中拉取的(拉取后删除),这里和微信逻辑类似。微信号称不在服务器存储数据,所以你用微信登录PC端时,你会发现刚刚手机上的消息在PC上怎么看不到?要么是PC端没有pull的过程,要么是离线消息只针对APP端,PC端拉不到。
  8. 同理,transfer中消费组mongodb消费到2条消息
  9. 调用redis的incr,递增用户的消息序号,key格式为:"REDIS_USER_INCR_SEQ: " + UserID,所以是用户范围内递增,因为本身用户只有一个收件箱,没毛病。
  10. 插入mongodb中的chat collection
  11. 优先通过gRPC调用pusher进行推送,否则走Kafka,通过Pusher消费的方式推送
  12. pusher也同样通过gRPC调用msg_gateway的MsgToUser推送消息
  13. 通过websocket推送
  14. 用户b上线的时候,通过pull从mongodb中拉取离线消息(成功后会从mongodb中删除)

发送端

  1. 通过websocket创建长连接,安全性依赖https,没有自定义加密,使用json的[]byte数组结构体接收,gate微服务判断包的类型后发送到rpc微服务
  2. rpc微服务会将消息使用protobuf格式化后推送进入消息队列中(如果是通知类的消息且在线直接在线),并返回成功(并不会保证落库成功之后才进行返回)
  3. 从消息队列中拿出有两个消费组分别消费,一个负责mysql的直接落库(如果设置了的话)
    1. 另外一一个拿出来首先经过一大堆内部的chan缓冲
    2. 先向redis请求seq,使用redis的incr获取seq
    3. 插入mongo数据持久存储
    4. 通过websocket推送消息体(因为这部分的可以保证所有的消息seq连续递增,因此可以保证有序性)

接收端

  1. 上线的时候和tira-im 类似,都是通过seq和redis+mongo拉取消息
  2. 启动的时候根据mongo中的最大seq初始化redis中的seq

特点

  1. mysql,mongodb会分别消费这个topic,一条消息会被消费多次,首先通过redis的自增id,然后将消息体储存到个人的mongodb中
  2. 使用push模型和个人收件箱(个人和小群),消息堆直接推送到客户端
  3. 大群使用类似helper-im 使用群收件箱,这个小群和大群似乎是创建群聊的时候设置的而不是后续变化的(这个感觉理论上也很难变化)

唯一性

  • redis的原子自增,保证seq不重不漏
  • 幂等性保证,依赖kafka的特性(存疑)

必达性

  • 实际上这个架构还是可能导致消息的丢失的,因为没有确保落库成功才ack到kafka

有序性

  • redis生成的seq保证自增保证有序性,这里的redis扩展的时候可能需要上一致性hash的方法

优缺点

优点

  • 似乎mongodb承担了收件箱排序的功能,和缓存的功能,没有进行redis缓存
  • push模型,在线直接推body,省略上行拉取的过程

缺点

  • 从kafka拿出来后直接标记ok,压根没有判断失败后重试的过程,只打了个日志,这样如果mysql断开连接落库失败,会导致消息的丢失
  • 依赖redis的原子incr,如果出现宕机,需要从mongo中恢复最大的序号,redis可能成为一个瓶颈

架构图