消息中间件功能
应用解耦
- 原来需要同步的两个应用/模块,使用消息队列后可以异步执行
流量削峰
- 流量达到高峰时,通常使用限流算法来控制流量涌入系统,避免系统被击瘫,但是这种方式损失了一部分请求。
- 使用消息队列可以缓冲大量请求,匀速消费。当消息堆积过多时,可以动态上线增加消费端,保证重要请求不会丢失
大数据处理
- 消息中间件可以将日志、用户行为等数据文件作为消息收集到主题中,数据使用方可以订阅自己需要的数据,互不影响
异构系统
- 跨语言
RocketMQ角色
nameserver
- 类似于注册中心,提供了路由管理、服务注册、服务发现的功能,集群中其他角色需要定时向nameserver上报自己的状态,以便互相发现彼此,超时不上报会被nameserver从列表中剔除
nameserver为了避免单点故障可以部署多个,保证高可用,但集群之间互不通信,没有主备的概念
- nameserver是无状态的,使用内存存储broke、topic等信息,默认不会持久化
broker
- 向每个nameserver注册自己的信息,负责存储和转发消息
broker集群
- 高可用,master/slave机构,master可读可写,slave只能读,master将数据同步给slave
- 指定相同的BrockerName,brokerId为0表示master,非0表示slave
topic
- 对消息进行分类的一个逻辑概念
queue
- 实际存放消息的物理单位,一个topic有多个queue
tag
- 相比于topic一种更细粒度的划分
groupname
- 标识一个生产组或一个消费组
offset
- 消息实际被消费到的位置,queue数组的下标
producer
- 消息生产者
- 与其中一个nameserver节点建立长连接,获得topic路由,包括topic下面有几个queue,这些queue分布在哪些broker上
- 与提供topic服务对应的master建立长连接,且定时发送心跳包
consumer
- 通过nameserver获得topic路由信息,连接到对应的broker上消费
- 与提供topic服务的master和slave都建立连接
RocketMQ消费模式
集群消费
- 每个消息只被处理一次,只发个一个group中的一个consume
- 每个订阅的group都会消费到消息
- 消费状态由broker维护
- 在消息重投时不能保证被路由到同一台机器上
广播消费
- 每个订阅的group中的consumer都会消费一次
- 消费进度由consumer维护
- 消费失败不会重投
RocketMQ的分布式事务支持
- 使用2PC方式实现
- Half Message预处理消息,进行预提交
TransactionListener最终提交
- executeLocalTransaction半消息发送成功用该方法执行本地事务
checkLocalTransaction检查本地事务状态
- LocalTransactionState.COMMIT_MESSAGE 确认提交
- LocalTransactionState.ROLLBACK_MESSAGE 回滚
- LocalTransactionState.UNKNOWN 等待broker回查
RocketMQ消息获得方式
pull拉
推荐,由客户端决定何时消费消息
push推
有可能造成consumer端消息堆积过多又不能被其他consumer消费
RocketMQ消息消费问题
消息被消费后会被立即删除吗?
- 不会,每条消息都被持久化到commitLog中
- 每个consumer维持消费进度,消费消息后只是当前的consumer消费进度(commitLog中的offset)更新了
- 48小时候会删除不再使用的commitLog文件
消息丢失
- producer在发送消息后会收到sendResult,表示消息发送成功
- 当消息发送失败的时候,重新发送,避免丢失(重试16次之后进死信队列)
- 整个流程使用单机事务保证可靠性
消息重复消费
原因
网络原因
- consumer首次启动
- broker没有收到ack导致消息重投
- 消息的幂等性
- 集群模式下消息消费以group为单位,每个group都会消费
解决方案(保证消息幂等性)
- 数据库本地事务表,使用transactionID作为主键插入,重复消费时会插入失败,单机事务保证可靠
- 单机时用ConcurrentHashMap的putIfAbsent
- Redis setnx主键
如何保证消息顺序消费
使用单线程处理
- 同一个topic、同一个queue发消息的时候一个线程去发送消息、消费的时候一个线程消费一个queue里的消息
- 如果多个queue要求有序,设置最大消费线程数和最小消费线程数都为1
如何保证消息不丢失
检测消息丢失
利用消息队列的有序性,在producer端给每个发出的消息附加一个连续递增的序号,在consumer端检查序号的连续性
确保消息可靠传递
生产阶段 | 捕获异常进行重发
- 同步消费捕获异常进行重发
- 异步消费在回调方法检查消费状态
存储阶段 | 配置刷盘为同步刷盘(而不是异步)
- 配置broker参数flushDiskType刷盘方式为SYNC_FLUSH同步刷盘
消费阶段 | 处理完所有消费逻辑后再确认
- 在执行完所有的消费逻辑之后再发送消费确认
消息堆积如何处理
消息堆积时间过长会超时吗?
RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时
解决办法
- 控制生产者的速率
- 简单来说上线更多consumer临时解决消息堆积问题
如果堆积了海量数据
- 注意:一定不能动生产方
- 准备一个临时的topic,并设置queue的数量为原先好几倍,分部到多个broker中
- 上线一个consumer将原来topic中的消息挪到新的topic里,不做业务处理
- 上线多台consumer同时消费临时topic中的数据
- 查找原先代码的bug
- 消费完积压数据后,恢复原来的架构部署,使用原来的consumer消费
零拷贝技术
使用NIO的MappedByteBuffer
MQ选型
RabbitMQ
erlang开发,低延迟
RocketMQ
Java开发,面向互联网集群化功能丰富
Kafka
Scala开发,面向日志
ActiveMQ
Java开发,简单稳定,社区减少维护
1 条评论
555