消息中间件功能

应用解耦
  • 原来需要同步的两个应用/模块,使用消息队列后可以异步执行
流量削峰
  • 流量达到高峰时,通常使用限流算法来控制流量涌入系统,避免系统被击瘫,但是这种方式损失了一部分请求。
  • 使用消息队列可以缓冲大量请求,匀速消费。当消息堆积过多时,可以动态上线增加消费端,保证重要请求不会丢失
大数据处理
  • 消息中间件可以将日志、用户行为等数据文件作为消息收集到主题中,数据使用方可以订阅自己需要的数据,互不影响
异构系统
  • 跨语言

RocketMQ角色

nameserver

  • 类似于注册中心,提供了路由管理、服务注册、服务发现的功能,集群中其他角色需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报会被nameserver从列表中剔除
  • nameserver为了避免单点故障可以部署多个,保证高可用,但集群之间互不通信,没有主备的概念

    • nameserver是无状态的,使用内存存储broke、topic等信息,默认不会持久化

broker

  • 向每个nameserver注册自己的信息,负责存储和转发消息
  • broker集群

    • 高可用,master/slave机构,master可读可写,slave只能读,master将数据同步给slave
    • 指定相同的BrockerName,brokerId为0表示master,非0表示slave
  • topic

    • 对消息进行分类的一个逻辑概念
  • queue

    • 实际存放消息的物理单位,一个topic有多个queue
  • tag

    • 相比于topic一种更细粒度的划分
  • groupname

    • 标识一个生产组或一个消费组
  • offset

    • 消息实际被消费到的位置,queue数组的下标

producer

  • 消息生产者
  • 与其中一个nameserver节点建立长连接,获得topic路由,包括topic下面有几个queue,这些queue分布在哪些broker上
  • 与提供topic服务对应的master建立长连接,且定时发送心跳包

consumer

  • 通过nameserver获得topic路由信息,连接到对应的broker上消费
  • 与提供topic服务的master和slave都建立连接

RocketMQ消费模式

集群消费

  • 每个消息只被处理一次,只发个一个group中的一个consume
  • 每个订阅的group都会消费到消息
  • 消费状态由broker维护
  • 在消息重投时不能保证被路由到同一台机器上

广播消费

  • 每个订阅的group中的consumer都会消费一次
  • 消费进度由consumer维护
  • 消费失败不会重投

RocketMQ的分布式事务支持

  • 使用2PC方式实现
  • Half Message预处理消息,进行预提交
  • TransactionListener最终提交

    • executeLocalTransaction半消息发送成功用该方法执行本地事务
    • checkLocalTransaction检查本地事务状态

      • LocalTransactionState.COMMIT_MESSAGE 确认提交
      • LocalTransactionState.ROLLBACK_MESSAGE 回滚
      • LocalTransactionState.UNKNOWN 等待broker回查

RocketMQ消息获得方式

pull拉

推荐,由客户端决定何时消费消息

push推

有可能造成consumer端消息堆积过多又不能被其他consumer消费

RocketMQ消息消费问题

消息被消费后会被立即删除吗?

  • 不会,每条消息都被持久化到commitLog中
  • 每个consumer维持消费进度,消费消息后只是当前的consumer消费进度(commitLog中的offset)更新了
  • 48小时候会删除不再使用的commitLog文件

消息丢失

  • producer在发送消息后会收到sendResult,表示消息发送成功
  • 当消息发送失败的时候,重新发送,避免丢失(重试16次之后进死信队列)
  • 整个流程使用单机事务保证可靠性

消息重复消费

原因
  • 网络原因

    • consumer首次启动
    • broker没有收到ack导致消息重投
    • 消息的幂等性
  • 集群模式下消息消费以group为单位,每个group都会消费
解决方案(保证消息幂等性)
  • 数据库本地事务表,使用transactionID作为主键插入,重复消费时会插入失败,单机事务保证可靠
  • 单机时用ConcurrentHashMap的putIfAbsent
  • Redis setnx主键

如何保证消息顺序消费

使用单线程处理
  • 同一个topic、同一个queue发消息的时候一个线程去发送消息、消费的时候一个线程消费一个queue里的消息
  • 如果多个queue要求有序,设置最大消费线程数和最小消费线程数都为1

如何保证消息不丢失

检测消息丢失

利用消息队列的有序性,在producer端给每个发出的消息附加一个连续递增的序号,在consumer端检查序号的连续性

确保消息可靠传递
  • 生产阶段 | 捕获异常进行重发

    • 同步消费捕获异常进行重发
    • 异步消费在回调方法检查消费状态
  • 存储阶段 | 配置刷盘为同步刷盘(而不是异步)

    • 配置broker参数flushDiskType刷盘方式为SYNC_FLUSH同步刷盘
  • 消费阶段 | 处理完所有消费逻辑后再确认

    • 在执行完所有的消费逻辑之后再发送消费确认

消息堆积如何处理

消息堆积时间过长会超时吗?

RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时

解决办法
  • 控制生产者的速率
  • 简单来说上线更多consumer临时解决消息堆积问题
  • 如果堆积了海量数据

    • 注意:一定不能动生产方
    • 准备一个临时的topic,并设置queue的数量为原先好几倍,分部到多个broker中
    • 上线一个consumer将原来topic中的消息挪到新的topic里,不做业务处理
    • 上线多台consumer同时消费临时topic中的数据
    • 查找原先代码的bug
    • 消费完积压数据后,恢复原来的架构部署,使用原来的consumer消费

零拷贝技术

使用NIO的MappedByteBuffer

MQ选型

RabbitMQ

erlang开发,低延迟

RocketMQ

Java开发,面向互联网集群化功能丰富

Kafka

Scala开发,面向日志

ActiveMQ

Java开发,简单稳定,社区减少维护

最后修改:2020 年 09 月 03 日 11 : 00 AM
如果觉得我的文章对你有用,请随意赞赏