mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1936 字
5 分钟
RocketMQ:事务消息与延迟消息
2026-04-06

双 11 零点,阿里巴巴的订单系统峰值达到 47 万 TPS——每秒 47 万笔交易,每一笔都要扣库存、记流水、发通知,零丢失。这个场景对消息系统提出了三个苛刻要求:事务消息(本地事务和消息发送必须原子完成)、延迟消息(下单 30 分钟未支付自动取消)、顺序消息(同一订单的状态变更必须按序消费)。Kafka 做不到事务消息的优雅实现,RabbitMQ 做不到高吞吐下的顺序保证——这正是 RocketMQ 存在的理由。

一、RocketMQ 架构#

1.1 核心组件#

RocketMQ 由四个核心组件构成:

graph TB subgraph "RocketMQ 集群" NS1["NameServer 1"] NS2["NameServer 2"] B1["Broker Master<br/>Broker-A"] B2["Broker Slave<br/>Broker-A-S"] B3["Broker Master<br/>Broker-B"] B4["Broker Slave<br/>Broker-B-S"] end P["Producer Group"] -->|"1. 查找路由"| NS1 NS1 -->|"2. 返回路由"| P P -->|"3. 发送消息"| B1 P -->|"3. 发送消息"| B3 B1 -->|"注册路由"| NS1 B1 -->|"注册路由"| NS2 B3 -->|"注册路由"| NS1 B3 -->|"注册路由"| NS2 B1 -->|"主从同步"| B2 B3 -->|"主从同步"| B4 C["Consumer Group"] -->|"4. 拉取消息"| B1 C -->|"4. 拉取消息"| B3
组件职责类比高可用
NameServer路由注册与发现注册中心无状态,多节点部署
Broker消息存储与转发消息服务器主从复制,Dledger 自动切换
Producer发送消息发件人Group 内容错
Consumer消费消息收件人Group 内负载均衡

1.2 NameServer vs ZooKeeper#

维度NameServerZooKeeper
定位轻量级路由注册通用协调服务
一致性最终一致(无选举)强一致(ZAB 协议)
功能路由注册/发现分布式锁/选举/配置
运维极简(无状态)复杂(需要维护 ZAB)
性能高(无共识开销)中(共识开销)
Note

RocketMQ 早期版本使用 ZooKeeper,后来改为自研 NameServer。核心原因:NameServer 之间互不通信,无需共识协议,极大地简化了运维。路由信息的最终一致性通过 Broker 定期心跳 + Producer 容错重试来保证。

1.3 Broker 存储模型#

RocketMQ Broker 的存储模型与 Kafka 类似,但有一些关键差异:

graph TB subgraph "Broker 存储结构" CG["CommitLog<br/>(所有 Topic 共享)"] CG --> Q1["ConsumeQueue: TopicA-Queue0<br/>(偏移量索引)"] CG --> Q2["ConsumeQueue: TopicA-Queue1<br/>(偏移量索引)"] CG --> Q3["ConsumeQueue: TopicB-Queue0<br/>(偏移量索引)"] CG --> IF["IndexFile<br/>(按 Key/时间索引)"] end subgraph "ConsumeQueue 条目" E1["8B: CommitLog Offset"] E2["4B: Message Size"] E3["8B: Tag HashCode"] end
存储文件说明类比 Kafka
CommitLog所有消息顺序写入,混合存储等价于所有 Partition 共享一个日志
ConsumeQueue按 Topic+QueueId 分组的偏移量索引等价于 Partition 的索引
IndexFile按 Key 和时间范围的索引Kafka 无等价物
# 查看存储结构
ls /root/store/commitlog/ # CommitLog 文件(1GB 每个)
ls /root/store/consumequeue/ # ConsumeQueue 目录
ls /root/store/index/ # IndexFile
# 查看消息
./mqadmin queryMsgById -i <msgId>
./mqadmin queryMsgByKey -t TopicOrder -k orderKey123
./mqadmin queryMsgByOffset -t TopicOrder -b broker-a -i 0 -o 1000

二、事务消息#

2.1 为什么需要事务消息?#

在分布式系统中,“本地事务 + 消息发送”的一致性是一个经典问题:

方案问题
先事务后发消息事务成功但发消息失败 → 消息丢失
先发消息后事务发消息成功但事务失败 → 消息多余
本地消息表需要额外存储,轮询效率低
事务消息两阶段提交,保证一致性

2.2 事务消息流程#

RocketMQ 的事务消息采用两阶段提交 + 回查机制:

sequenceDiagram participant P as Producer participant B as Broker participant L as 本地事务 Note over P,B: 第一阶段:发送半消息 P->>B: 1. 发送半消息(Half Message) B->>B: 2. 存入半消息 Topic(对 Consumer 不可见) B-->>P: 3. 半消息发送成功 Note over P,L: 第二阶段:执行本地事务 P->>L: 4. 执行本地事务 alt 事务成功 L-->>P: COMMIT P->>B: 5a. 提交事务(COMMIT) B->>B: 6a. 消息转入目标 Topic(Consumer 可见) else 事务失败 L-->>P: ROLLBACK P->>B: 5b. 回滚事务(ROLLBACK) B->>B: 6b. 删除半消息 else 未知(网络超时) L-->>P: UNKNOWN Note over B: 等待回查 end Note over P,B: 回查机制 B->>P: 7. 事务回查(定期检查) P->>L: 8. 检查本地事务状态 P->>B: 9. 返回 COMMIT/ROLLBACK

2.3 事务消息实现#

// 1. 创建事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");
producer.setNamesrvAddr("localhost:9876");
// 2. 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行本地事务:创建订单
Order order = JSON.parseObject(msg.getBody(), Order.class);
orderService.createOrder(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 回查:检查本地事务状态
String orderId = msg.getUserProperty("orderId");
Order order = orderService.getOrder(orderId);
if (order != null && order.getStatus() != OrderStatus.CANCELLED) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
});
producer.start();
// 3. 发送事务消息
Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());
msg.putUserProperty("orderId", orderId);
producer.sendMessageInTransaction(msg, null);

2.4 事务消息的关键参数#

参数默认值说明
transactionTimeout60s事务超时时间,超时后触发回查
transactionCheckMax15最大回查次数
transactionCheckInterval60s回查间隔
Warning

事务消息的回查机制不是”银弹”——如果本地事务一直返回 UNKNOWN,Broker 会持续回查直到达到最大次数。务必确保 checkLocalTransaction 方法能快速、准确地返回事务状态,避免长时间回查。

三、延迟消息#

3.1 延迟消息原理#

RocketMQ 的延迟消息通过”临时 Topic + 定时转存”实现:

graph LR P["Producer"] -->|"发送延迟消息<br/>delayLevel=3(10s)"| B["Broker"] B -->|"存入 SCHEDULE_TOPIC_XXXX<br/>QueueId = delayLevel"| ST["延迟 Topic"] ST -->|"定时任务扫描<br/>到期消息"| DT["目标 Topic"] DT --> C["Consumer"]
delayLevel延迟时间delayLevel延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

3.2 延迟消息使用#

// 发送延迟消息
DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());
msg.setDelayTimeLevel(3); // 10 秒后消费
SendResult result = producer.send(msg);
System.out.println("Send result: " + result.getSendStatus());

3.3 RocketMQ 5.x 任意延迟#

RocketMQ 5.x 支持任意延迟时间(不再限制为 18 个级别):

// RocketMQ 5.x 任意延迟
Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());
msg.setDelayTimeSec(600); // 延迟 600 秒(10 分钟)
// 或
msg.setDelayTimeMs(600000L); // 延迟 600000 毫秒
SendResult result = producer.send(msg);
版本延迟方式精度适用场景
4.x固定 18 级秒级电商超时取消
5.x任意时间毫秒级精确延迟场景

3.4 延迟消息典型场景#

// 场景:订单超时自动取消
// 1. 创建订单时发送延迟消息
public void createOrder(Order order) {
// 保存订单到数据库
orderMapper.insert(order);
// 发送 30 分钟延迟消息
Message msg = new Message("TopicOrderTimeout", order.getId().getBytes());
msg.setDelayTimeLevel(16); // 30 分钟
producer.send(msg);
}
// 2. 消费超时消息
public void onOrderTimeout(MessageExt msg) {
String orderId = new String(msg.getBody());
Order order = orderMapper.selectById(orderId);
if (order != null && order.getStatus() == OrderStatus.UNPAID) {
// 订单未支付,自动取消
order.setStatus(OrderStatus.CANCELLED);
orderMapper.updateById(order);
// 释放库存
inventoryService.release(order.getProductId(), order.getQuantity());
}
}

四、顺序消息#

4.1 全局有序 vs 分区有序#

有序级别说明性能实现方式
全局有序所有消息严格按发送顺序消费低(单队列)单 Topic 单 Queue
分区有序同一 Key 的消息按顺序消费高(多队列)同 Key 路由到同一 Queue
graph TB subgraph "全局有序(不推荐)" P1["Producer"] --> Q1["Queue 0"] Q1 --> C1["Consumer"] end subgraph "分区有序(推荐)" P2["Producer"] -->|"orderId=1001"| Q2["Queue 0"] P2 -->|"orderId=1002"| Q3["Queue 1"] P2 -->|"orderId=1001"| Q2 Q2 --> C2["Consumer 0<br/>顺序消费 orderId=1001"] Q3 --> C3["Consumer 1<br/>顺序消费 orderId=1002"] end

4.2 顺序消息实现#

// 发送顺序消息:使用 MessageQueueSelector
DefaultMQProducer producer = new DefaultMQProducer("order-producer-group");
producer.start();
// 按 orderId 选择队列,保证同一订单的消息进入同一队列
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long orderId = (Long) arg;
int index = (int) (orderId % mqs.size());
return mqs.get(index);
}
}, orderId);
// 顺序消费:使用 MessageListenerOrderly
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order-consumer-group");
consumer.subscribe("TopicOrder", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
processOrder(msg);
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();

4.3 顺序消费 vs 并发消费#

维度MessageListenerOrderlyMessageListenerConcurrently
消费顺序严格按队列顺序不保证顺序
并发度单线程消费单队列多线程消费
失败处理暂停当前队列消费重试后进入延迟队列
吞吐量
适用场景订单状态变更、数据同步普通业务消息
Tip

顺序消息的代价是牺牲并发度——同一 Queue 的消息只能单线程消费。如果追求高吞吐,优先考虑”分区有序”而非”全局有序”,通过合理的 Key 设计将有序范围缩小到业务需要的最小粒度。

五、消息过滤#

5.1 Tag 过滤#

RocketMQ 支持 Tag 级别的消息过滤,Consumer 可以只订阅特定 Tag:

// Producer:设置 Tag
Message msg = new Message("TopicOrder", "TagCreate", orderCreate.getBytes());
Message msg2 = new Message("TopicOrder", "TagPay", orderPay.getBytes());
// Consumer:按 Tag 订阅
// 方式一:订阅单个 Tag
consumer.subscribe("TopicOrder", "TagCreate");
// 方式二:订阅多个 Tag(|| 分隔)
consumer.subscribe("TopicOrder", "TagCreate||TagPay");
// 方式三:订阅所有 Tag
consumer.subscribe("TopicOrder", "*");

5.2 SQL92 过滤#

RocketMQ 支持 SQL92 表达式过滤,可以基于消息属性进行复杂过滤:

// Producer:设置属性
Message msg = new Message("TopicOrder", "TagA", orderJson.getBytes());
msg.putUserProperty("amount", "1500");
msg.putUserProperty("region", "east");
msg.putUserProperty("vip", "true");
// Consumer:SQL92 过滤
consumer.subscribe("TopicOrder",
MessageSelector.bySql("amount > 1000 AND region = 'east'"));
// 其他 SQL92 示例
MessageSelector.bySql("vip = 'true'");
MessageSelector.bySql("amount BETWEEN 100 AND 1000");
MessageSelector.bySql("region IN ('east', 'south')");
MessageSelector.bySql("amount > 500 OR vip = 'true'");
过滤方式过滤位置性能灵活性
Tag 过滤Broker + Consumer低(仅 Tag 匹配)
SQL92 过滤Broker高(属性条件组合)
# 开启 SQL92 过滤(Broker 配置)
enablePropertyFilter=true

六、消息可靠性#

6.1 Producer 端可靠性#

策略说明配置
同步发送等待 Broker 确认producer.send(msg)
重试发送失败自动重试retryTimesWhenSendFailed=3
故障延迟发送失败后延迟该 BrokersendLatencyFaultEnable=true
DefaultMQProducer producer = new DefaultMQProducer("reliable-producer-group");
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数
producer.setRetryTimesWhenSendAsyncFailed(3); // 异步发送重试次数
producer.setSendLatencyFaultEnable(true); // 开启故障延迟
// 同步发送(最可靠)
SendResult result = producer.send(msg);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 处理发送失败
log.error("Send failed: {}", result.getSendStatus());
}

6.2 Broker 端可靠性#

策略说明配置
同步刷盘消息写入磁盘后才返回 ACKflushDiskType=SYNC_FLUSH
同步复制主从同步后才返回 ACKbrokerRole=SYNC_MASTER
消息轨迹记录消息全链路轨迹traceTopic=RMQ_SYS_TRACE_TOPIC
# Broker 配置(高可靠模式)
flushDiskType=SYNC_FLUSH # 同步刷盘
brokerRole=SYNC_MASTER # 同步复制
sendMessageThreadPoolNums=64 # 发送线程池
flushCommitLogTimed=0 # 实时刷盘

6.3 Consumer 端可靠性#

// 消费重试机制
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("reliable-consumer-group");
// 并发消费:失败后自动重试
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
processMessage(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 返回 RECONSUME_LATER,消息会进入重试队列
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 设置最大重试次数
consumer.setMaxReconsumeTimes(16); // 默认 16 次
重试次数延迟时间重试次数延迟时间
110s97min
230s108min
31min119min
42min1210min
53min1320min
64min1430min
75min151h
86min162h
Warning

超过最大重试次数后,消息会进入死信队列(%DLQ%ConsumerGroup)。务必监控死信队列,避免消息静默丢失。

七、RocketMQ vs Kafka vs RabbitMQ#

维度RocketMQKafkaRabbitMQ
事务消息原生支持(两阶段提交)支持(事务 API)不支持
延迟消息原生支持不支持(需自行实现)插件支持
顺序消息原生支持分区有序队列有序
消息过滤Tag + SQL92
消息回溯支持按时间戳支持按 Offset不支持
吞吐量十万级 TPS百万级 TPS万级 TPS
延迟毫秒级毫秒级微秒级
协议自定义自定义AMQP
适用场景电商、金融、事务日志流、大数据业务消息、路由

八、总结#

上一章了解了RabbitMQ 与 AMQP 模型。

维度关键要点
架构NameServer(无状态路由)+ Broker(CommitLog + ConsumeQueue)
事务消息两阶段提交 + 回查机制,保证本地事务与消息发送的一致性
延迟消息临时 Topic + 定时转存,4.x 固定 18 级,5.x 任意延迟
顺序消息MessageQueueSelector + MessageListenerOrderly,分区有序
消息过滤Tag 过滤(高性能)+ SQL92 过滤(高灵活性)
可靠性同步刷盘 + 同步复制 + 消费重试 + 死信队列
Tip

RocketMQ 的核心优势是”业务友好”——事务消息解决分布式事务、延迟消息实现定时任务、顺序消息保证状态有序、SQL92 过滤减少网络传输。如果你的系统是电商/金融场景,RocketMQ 是最自然的选择。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

RocketMQ:事务消息与延迟消息
https://blog.souloss.com/posts/messaging/rocketmq/
作者
Souloss
发布于
2026-04-06
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时