双 11 零点,阿里巴巴的订单系统峰值达到 47 万 TPS——每秒 47 万笔交易,每一笔都要扣库存、记流水、发通知,零丢失。这个场景对消息系统提出了三个苛刻要求:事务消息(本地事务和消息发送必须原子完成)、延迟消息(下单 30 分钟未支付自动取消)、顺序消息(同一订单的状态变更必须按序消费)。Kafka 做不到事务消息的优雅实现,RabbitMQ 做不到高吞吐下的顺序保证——这正是 RocketMQ 存在的理由。
一、RocketMQ 架构
1.1 核心组件
RocketMQ 由四个核心组件构成:
| 组件 | 职责 | 类比 | 高可用 |
|---|---|---|---|
| NameServer | 路由注册与发现 | 注册中心 | 无状态,多节点部署 |
| Broker | 消息存储与转发 | 消息服务器 | 主从复制,Dledger 自动切换 |
| Producer | 发送消息 | 发件人 | Group 内容错 |
| Consumer | 消费消息 | 收件人 | Group 内负载均衡 |
1.2 NameServer vs ZooKeeper
| 维度 | NameServer | ZooKeeper |
|---|---|---|
| 定位 | 轻量级路由注册 | 通用协调服务 |
| 一致性 | 最终一致(无选举) | 强一致(ZAB 协议) |
| 功能 | 路由注册/发现 | 分布式锁/选举/配置 |
| 运维 | 极简(无状态) | 复杂(需要维护 ZAB) |
| 性能 | 高(无共识开销) | 中(共识开销) |
RocketMQ 早期版本使用 ZooKeeper,后来改为自研 NameServer。核心原因:NameServer 之间互不通信,无需共识协议,极大地简化了运维。路由信息的最终一致性通过 Broker 定期心跳 + Producer 容错重试来保证。
1.3 Broker 存储模型
RocketMQ Broker 的存储模型与 Kafka 类似,但有一些关键差异:
| 存储文件 | 说明 | 类比 Kafka |
|---|---|---|
| CommitLog | 所有消息顺序写入,混合存储 | 等价于所有 Partition 共享一个日志 |
| ConsumeQueue | 按 Topic+QueueId 分组的偏移量索引 | 等价于 Partition 的索引 |
| IndexFile | 按 Key 和时间范围的索引 | Kafka 无等价物 |
# 查看存储结构ls /root/store/commitlog/ # CommitLog 文件(1GB 每个)ls /root/store/consumequeue/ # ConsumeQueue 目录ls /root/store/index/ # IndexFile
# 查看消息./mqadmin queryMsgById -i <msgId>./mqadmin queryMsgByKey -t TopicOrder -k orderKey123./mqadmin queryMsgByOffset -t TopicOrder -b broker-a -i 0 -o 1000二、事务消息
2.1 为什么需要事务消息?
在分布式系统中,“本地事务 + 消息发送”的一致性是一个经典问题:
| 方案 | 问题 |
|---|---|
| 先事务后发消息 | 事务成功但发消息失败 → 消息丢失 |
| 先发消息后事务 | 发消息成功但事务失败 → 消息多余 |
| 本地消息表 | 需要额外存储,轮询效率低 |
| 事务消息 | 两阶段提交,保证一致性 |
2.2 事务消息流程
RocketMQ 的事务消息采用两阶段提交 + 回查机制:
2.3 事务消息实现
// 1. 创建事务消息生产者TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");producer.setNamesrvAddr("localhost:9876");
// 2. 设置事务监听器producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地事务:创建订单 Order order = JSON.parseObject(msg.getBody(), Order.class); orderService.createOrder(order); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { // 回查:检查本地事务状态 String orderId = msg.getUserProperty("orderId"); Order order = orderService.getOrder(orderId); if (order != null && order.getStatus() != OrderStatus.CANCELLED) { return LocalTransactionState.COMMIT_MESSAGE; } return LocalTransactionState.ROLLBACK_MESSAGE; }});
producer.start();
// 3. 发送事务消息Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());msg.putUserProperty("orderId", orderId);producer.sendMessageInTransaction(msg, null);2.4 事务消息的关键参数
| 参数 | 默认值 | 说明 |
|---|---|---|
transactionTimeout | 60s | 事务超时时间,超时后触发回查 |
transactionCheckMax | 15 | 最大回查次数 |
transactionCheckInterval | 60s | 回查间隔 |
事务消息的回查机制不是”银弹”——如果本地事务一直返回 UNKNOWN,Broker 会持续回查直到达到最大次数。务必确保 checkLocalTransaction 方法能快速、准确地返回事务状态,避免长时间回查。
三、延迟消息
3.1 延迟消息原理
RocketMQ 的延迟消息通过”临时 Topic + 定时转存”实现:
| delayLevel | 延迟时间 | delayLevel | 延迟时间 |
|---|---|---|---|
| 1 | 1s | 10 | 6min |
| 2 | 5s | 11 | 7min |
| 3 | 10s | 12 | 8min |
| 4 | 30s | 13 | 9min |
| 5 | 1min | 14 | 10min |
| 6 | 2min | 15 | 20min |
| 7 | 3min | 16 | 30min |
| 8 | 4min | 17 | 1h |
| 9 | 5min | 18 | 2h |
3.2 延迟消息使用
// 发送延迟消息DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");producer.setNamesrvAddr("localhost:9876");producer.start();
Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());msg.setDelayTimeLevel(3); // 10 秒后消费
SendResult result = producer.send(msg);System.out.println("Send result: " + result.getSendStatus());3.3 RocketMQ 5.x 任意延迟
RocketMQ 5.x 支持任意延迟时间(不再限制为 18 个级别):
// RocketMQ 5.x 任意延迟Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());msg.setDelayTimeSec(600); // 延迟 600 秒(10 分钟)// 或msg.setDelayTimeMs(600000L); // 延迟 600000 毫秒
SendResult result = producer.send(msg);| 版本 | 延迟方式 | 精度 | 适用场景 |
|---|---|---|---|
| 4.x | 固定 18 级 | 秒级 | 电商超时取消 |
| 5.x | 任意时间 | 毫秒级 | 精确延迟场景 |
3.4 延迟消息典型场景
// 场景:订单超时自动取消// 1. 创建订单时发送延迟消息public void createOrder(Order order) { // 保存订单到数据库 orderMapper.insert(order);
// 发送 30 分钟延迟消息 Message msg = new Message("TopicOrderTimeout", order.getId().getBytes()); msg.setDelayTimeLevel(16); // 30 分钟 producer.send(msg);}
// 2. 消费超时消息public void onOrderTimeout(MessageExt msg) { String orderId = new String(msg.getBody()); Order order = orderMapper.selectById(orderId);
if (order != null && order.getStatus() == OrderStatus.UNPAID) { // 订单未支付,自动取消 order.setStatus(OrderStatus.CANCELLED); orderMapper.updateById(order); // 释放库存 inventoryService.release(order.getProductId(), order.getQuantity()); }}四、顺序消息
4.1 全局有序 vs 分区有序
| 有序级别 | 说明 | 性能 | 实现方式 |
|---|---|---|---|
| 全局有序 | 所有消息严格按发送顺序消费 | 低(单队列) | 单 Topic 单 Queue |
| 分区有序 | 同一 Key 的消息按顺序消费 | 高(多队列) | 同 Key 路由到同一 Queue |
4.2 顺序消息实现
// 发送顺序消息:使用 MessageQueueSelectorDefaultMQProducer producer = new DefaultMQProducer("order-producer-group");producer.start();
// 按 orderId 选择队列,保证同一订单的消息进入同一队列SendResult result = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Long orderId = (Long) arg; int index = (int) (orderId % mqs.size()); return mqs.get(index); }}, orderId);
// 顺序消费:使用 MessageListenerOrderlyDefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");consumer.subscribe("TopicOrder", "*");
consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { processOrder(msg); } return ConsumeOrderlyStatus.SUCCESS; }});consumer.start();4.3 顺序消费 vs 并发消费
| 维度 | MessageListenerOrderly | MessageListenerConcurrently |
|---|---|---|
| 消费顺序 | 严格按队列顺序 | 不保证顺序 |
| 并发度 | 单线程消费单队列 | 多线程消费 |
| 失败处理 | 暂停当前队列消费 | 重试后进入延迟队列 |
| 吞吐量 | 低 | 高 |
| 适用场景 | 订单状态变更、数据同步 | 普通业务消息 |
顺序消息的代价是牺牲并发度——同一 Queue 的消息只能单线程消费。如果追求高吞吐,优先考虑”分区有序”而非”全局有序”,通过合理的 Key 设计将有序范围缩小到业务需要的最小粒度。
五、消息过滤
5.1 Tag 过滤
RocketMQ 支持 Tag 级别的消息过滤,Consumer 可以只订阅特定 Tag:
// Producer:设置 TagMessage msg = new Message("TopicOrder", "TagCreate", orderCreate.getBytes());Message msg2 = new Message("TopicOrder", "TagPay", orderPay.getBytes());
// Consumer:按 Tag 订阅// 方式一:订阅单个 Tagconsumer.subscribe("TopicOrder", "TagCreate");
// 方式二:订阅多个 Tag(|| 分隔)consumer.subscribe("TopicOrder", "TagCreate||TagPay");
// 方式三:订阅所有 Tagconsumer.subscribe("TopicOrder", "*");5.2 SQL92 过滤
RocketMQ 支持 SQL92 表达式过滤,可以基于消息属性进行复杂过滤:
// Producer:设置属性Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());msg.putUserProperty("amount", "1500");msg.putUserProperty("region", "east");msg.putUserProperty("vip", "true");
// Consumer:SQL92 过滤consumer.subscribe("TopicOrder", MessageSelector.bySql("amount > 1000 AND region = 'east'"));
// 其他 SQL92 示例MessageSelector.bySql("vip = 'true'");MessageSelector.bySql("amount BETWEEN 100 AND 1000");MessageSelector.bySql("region IN ('east', 'south')");MessageSelector.bySql("amount > 500 OR vip = 'true'");| 过滤方式 | 过滤位置 | 性能 | 灵活性 |
|---|---|---|---|
| Tag 过滤 | Broker + Consumer | 高 | 低(仅 Tag 匹配) |
| SQL92 过滤 | Broker | 中 | 高(属性条件组合) |
# 开启 SQL92 过滤(Broker 配置)enablePropertyFilter=true六、消息可靠性
6.1 Producer 端可靠性
| 策略 | 说明 | 配置 |
|---|---|---|
| 同步发送 | 等待 Broker 确认 | producer.send(msg) |
| 重试 | 发送失败自动重试 | retryTimesWhenSendFailed=3 |
| 故障延迟 | 发送失败后延迟该 Broker | sendLatencyFaultEnable=true |
DefaultMQProducer producer = new DefaultMQProducer("reliable-producer-group");producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送重试次数producer.setSendLatencyFaultEnable(true); // 开启故障延迟
// 同步发送(最可靠)SendResult result = producer.send(msg);if (result.getSendStatus() != SendStatus.SEND_OK) { // 处理发送失败 log.error("Send failed: {}", result.getSendStatus());}6.2 Broker 端可靠性
| 策略 | 说明 | 配置 |
|---|---|---|
| 同步刷盘 | 消息写入磁盘后才返回 ACK | flushDiskType=SYNC_FLUSH |
| 同步复制 | 主从同步后才返回 ACK | brokerRole=SYNC_MASTER |
| 消息轨迹 | 记录消息全链路轨迹 | traceTopic=RMQ_SYS_TRACE_TOPIC |
# Broker 配置(高可靠模式)flushDiskType=SYNC_FLUSH # 同步刷盘brokerRole=SYNC_MASTER # 同步复制sendMessageThreadPoolNums=64 # 发送线程池flushCommitLogTimed=0 # 实时刷盘6.3 Consumer 端可靠性
// 消费重试机制DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("reliable-consumer-group");
// 并发消费:失败后自动重试consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { processMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 返回 RECONSUME_LATER,消息会进入重试队列 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }});
// 设置最大重试次数consumer.setMaxReconsumeTimes(16); // 默认 16 次| 重试次数 | 延迟时间 | 重试次数 | 延迟时间 |
|---|---|---|---|
| 1 | 10s | 9 | 7min |
| 2 | 30s | 10 | 8min |
| 3 | 1min | 11 | 9min |
| 4 | 2min | 12 | 10min |
| 5 | 3min | 13 | 20min |
| 6 | 4min | 14 | 30min |
| 7 | 5min | 15 | 1h |
| 8 | 6min | 16 | 2h |
超过最大重试次数后,消息会进入死信队列(%DLQ%ConsumerGroup)。务必监控死信队列,避免消息静默丢失。
七、RocketMQ vs Kafka vs RabbitMQ
| 维度 | RocketMQ | Kafka | RabbitMQ |
|---|---|---|---|
| 事务消息 | 原生支持(两阶段提交) | 支持(事务 API) | 不支持 |
| 延迟消息 | 原生支持 | 不支持(需自行实现) | 插件支持 |
| 顺序消息 | 原生支持 | 分区有序 | 队列有序 |
| 消息过滤 | Tag + SQL92 | 无 | 无 |
| 消息回溯 | 支持按时间戳 | 支持按 Offset | 不支持 |
| 吞吐量 | 十万级 TPS | 百万级 TPS | 万级 TPS |
| 延迟 | 毫秒级 | 毫秒级 | 微秒级 |
| 协议 | 自定义 | 自定义 | AMQP |
| 适用场景 | 电商、金融、事务 | 日志流、大数据 | 业务消息、路由 |
八、总结
上一章了解了RabbitMQ 与 AMQP 模型。
| 维度 | 关键要点 |
|---|---|
| 架构 | NameServer(无状态路由)+ Broker(CommitLog + ConsumeQueue) |
| 事务消息 | 两阶段提交 + 回查机制,保证本地事务与消息发送的一致性 |
| 延迟消息 | 临时 Topic + 定时转存,4.x 固定 18 级,5.x 任意延迟 |
| 顺序消息 | MessageQueueSelector + MessageListenerOrderly,分区有序 |
| 消息过滤 | Tag 过滤(高性能)+ SQL92 过滤(高灵活性) |
| 可靠性 | 同步刷盘 + 同步复制 + 消费重试 + 死信队列 |
RocketMQ 的核心优势是”业务友好”——事务消息解决分布式事务、延迟消息实现定时任务、顺序消息保证状态有序、SQL92 过滤减少网络传输。如果你的系统是电商/金融场景,RocketMQ 是最自然的选择。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






