每秒 1 万笔订单,每笔订单需要同时路由到库存服务(扣库存)、支付服务(记账)和通知服务(发短信)——三条路径,三种消费模式,还要保证没消费完的消息不丢失。Kafka 的 Topic 模型做不了这件事:一个 Topic 只有一种消费模式,要三种路由就得建三个 Topic,生产者发三次。RabbitMQ 的 Exchange 天生解决这个问题——生产者发一次消息,Exchange 根据 routing key 分发到不同的 Queue,每个 Queue 独立消费、独立确认。
一、AMQP 协议与模型
1.1 AMQP 是什么?
AMQP(Advanced Message Queuing Protocol)是一个开放标准的消息协议,定义了消息的格式、路由规则和传输方式。RabbitMQ 是 AMQP 0-9-1 协议的最成熟实现。
| 特性 | AMQP 0-9-1 | AMQP 1.0 | MQTT | Kafka 协议 |
|---|---|---|---|---|
| 定位 | 消息队列协议 | 通用消息协议 | IoT 轻量协议 | 日志流协议 |
| 路由模型 | Exchange + Queue | Link + Session | Topic 订阅 | Partition |
| 事务 | 支持 | 支持 | 不支持 | 支持 |
| 流控 | Credit-based | Flow control | 简单 | 无(靠拉取) |
| 典型实现 | RabbitMQ | Azure Service Bus | Mosquitto | Kafka |
1.2 AMQP 模型核心概念
AMQP 模型的核心是”智能路由”——消息由 Exchange 根据规则路由到 Queue,消费者从 Queue 获取消息:
| 概念 | 说明 | 类比 |
|---|---|---|
| Exchange | 消息路由器,根据规则分发消息 | 邮局分拣中心 |
| Queue | 消息缓冲区,存储待消费的消息 | 收件箱 |
| Binding | Exchange 与 Queue 的绑定规则 | 分拣规则 |
| Routing Key | 消息的路由键 | 信封上的地址 |
| Virtual Host | 虚拟主机,隔离 Exchange/Queue/权限 | 独立邮局 |
AMQP 与 Kafka 的根本区别在于路由模型:Kafka 的 Topic 是”分区日志”,消费者主动拉取;RabbitMQ 的 Exchange 是”智能路由”,消息被推送到匹配的 Queue。Kafka 适合高吞吐的流式场景,RabbitMQ 适合灵活路由的业务消息场景。
二、Exchange 类型详解
2.1 四种 Exchange 类型
RabbitMQ 提供四种内置 Exchange 类型,每种有不同的路由规则:
| Exchange 类型 | 路由规则 | 适用场景 | 性能 |
|---|---|---|---|
| direct | routing key 精确匹配 | 点对点、任务分发 | 最高 |
| fanout | 广播到所有绑定队列 | 广播通知、日志分发 | 高 |
| topic | routing key 模式匹配 | 发布订阅、多维度路由 | 中 |
| headers | 消息头属性匹配 | 复杂条件路由 | 低 |
// 声明 Exchangechannel.exchangeDeclare("order-direct", "direct", true); // 持久化channel.exchangeDeclare("log-fanout", "fanout", true);channel.exchangeDeclare("event-topic", "topic", true);channel.exchangeDeclare="route-headers", "headers", true);2.2 Direct Exchange
Direct Exchange 根据 routing key 精确匹配 Binding Key:
// 声明 Direct Exchange 和绑定channel.exchangeDeclare("order-exchange", "direct", true);
// 声明队列channel.queueDeclare("new-orders", true, false, false, null);channel.queueDeclare("paid-orders", true, false, false, null);channel.queueDeclare("cancel-orders", true, false, false, null);
// 绑定:routing key 精确匹配channel.queueBind("new-orders", "order-exchange", "order.new");channel.queueBind("paid-orders", "order-exchange", "order.paid");channel.queueBind("cancel-orders", "order-exchange", "order.cancel");
// 发送消息channel.basicPublish("order-exchange", "order.new", null, orderJson.getBytes());2.3 Fanout Exchange
Fanout Exchange 忽略 routing key,将消息广播到所有绑定的队列:
// 声明 Fanout Exchangechannel.exchangeDeclare("log-fanout", "fanout", true);
// 多个队列绑定到同一个 Fanout Exchangechannel.queueDeclare("log-file", true, false, false, null);channel.queueDeclare("log-elasticsearch", true, false, false, null);channel.queueDeclare("log-monitor", true, false, false, null);
// 绑定:无需 routing keychannel.queueBind("log-file", "log-fanout", "");channel.queueBind("log-elasticsearch", "log-fanout", "");channel.queueBind("log-monitor", "log-fanout", "");
// 发送消息——所有队列都会收到channel.basicPublish("log-fanout", "", null, logData.getBytes());| 特性 | 说明 |
|---|---|
| routing key | 被忽略 |
| 绑定数量 | 不限,所有绑定队列都收到消息 |
| 典型场景 | 日志分发、事件广播、配置更新通知 |
2.4 Topic Exchange
Topic Exchange 支持 routing key 的模式匹配,使用通配符:
| 通配符 | 含义 | 示例 |
|---|---|---|
* | 匹配一个单词 | order.* 匹配 order.new,不匹配 order.new.vip |
# | 匹配零或多个单词 | order.# 匹配 order.new、order.new.vip |
// 声明 Topic Exchangechannel.exchangeDeclare("event-topic", "topic", true);
// 绑定:不同消费者订阅不同模式channel.queueBind("all-orders", "event-topic", "order.#"); // 所有订单事件channel.queueBind("new-orders", "event-topic", "order.new"); // 仅新订单channel.queueBind("vip-orders", "event-topic", "order.*.vip"); // VIP 订单channel.queueBind("all-events", "event-topic", "#"); // 所有事件
// 发送消息channel.basicPublish("event-topic", "order.new.vip", null, eventJson.getBytes());// 匹配:all-orders(order.#)、vip-orders(order.*.vip)、all-events(#)2.5 Headers Exchange
Headers Exchange 根据消息头属性匹配,适用于复杂条件路由:
// 声明 Headers Exchangechannel.exchangeDeclare("route-headers", "headers", true);
// 绑定:匹配条件Map<String, Object> bindingArgs1 = new HashMap<>();bindingArgs1.put("x-match", "all"); // 所有头都匹配bindingArgs1.put("format", "pdf");bindingArgs1.put("type", "report");
Map<String, Object> bindingArgs2 = new HashMap<>();bindingArgs2.put("x-match", "any"); // 任一头匹配bindingArgs2.put("format", "pdf");bindingArgs2.put("format", "csv");
channel.queueBind("pdf-reports", "route-headers", "", bindingArgs1);channel.queueBind("data-files", "route-headers", "", bindingArgs2);
// 发送消息AMQP.BasicProperties props = new AMQP.BasicProperties.Builder() .headers(Map.of("format", "pdf", "type", "report")) .build();channel.basicPublish("route-headers", "", props, data.getBytes());| x-match 模式 | 含义 | 性能 |
|---|---|---|
all | 所有头属性都匹配 | 较低 |
any | 任一头属性匹配 | 较低 |
Headers Exchange 的匹配性能低于其他 Exchange 类型,因为需要解析和比较消息头。除非路由逻辑确实需要基于多属性匹配,否则优先使用 direct 或 topic Exchange。
三、队列类型与特性
3.1 队列类型
RabbitMQ 3.12+ 提供三种队列类型:
| 队列类型 | 存储 | 持久化 | 性能 | 适用场景 |
|---|---|---|---|---|
| Classic Queue | 内存 + 磁盘 | 可选 | 高 | 通用场景 |
| Quorum Queue | 多节点 Raft 日志 | 始终持久化 | 中 | 高可靠场景 |
| Stream Queue | 仅追加日志 | 始终持久化 | 高 | 高吞吐、消费回放 |
// 声明 Classic Queuechannel.queueDeclare("classic-queue", true, false, false, null);
// 声明 Quorum Queue(通过参数指定)Map<String, Object> quorumArgs = new HashMap<>();quorumArgs.put("x-queue-type", "quorum");quorumArgs.put("x-delivery-limit", 3); // 投递次数限制channel.queueDeclare("quorum-queue", true, false, false, quorumArgs);
// 声明 Stream QueueMap<String, Object> streamArgs = new HashMap<>();streamArgs.put("x-queue-type", "stream");streamArgs.put("x-max-length-bytes", 1073741824L); // 1GBstreamArgs.put("x-stream-max-segment-size-bytes", 67108864L); // 64MBchannel.queueDeclare("event-stream", true, false, false, streamArgs);3.2 Quorum Queue 详解
Quorum Queue 是 RabbitMQ 的强一致性队列,使用 Raft 协议实现副本:
| 特性 | Classic Queue | Quorum Queue |
|---|---|---|
| 副本 | 可选(镜像队列,已废弃) | 始终多副本(Raft) |
| 一致性 | 最终一致 | 强一致 |
| 消息持久化 | 可选 | 始终持久化 |
| 投递限制 | 无 | x-delivery-limit |
| 死信 | 支持 | 支持 |
| 性能 | 高 | 中(Raft 共识开销) |
# 查看 Quorum Queue 状态rabbitmq-ctl list_queues name type quorum_status leader members
# 输出示例:# quorum-queue quorum running rabbit@node1 [rabbit@node1, rabbit@node2, rabbit@node3]3.3 Stream Queue 详解
Stream Queue 是 RabbitMQ 3.9+ 引入的仅追加日志队列,类似 Kafka 的分区日志:
// Stream 消费:支持偏移量消费channel.basicQos(100); // 预取
// 从指定偏移量开始消费Map<String, Object> consumeArgs = new HashMap<>();consumeArgs.put("x-stream-offset", "first"); // 从第一条开始// consumeArgs.put("x-stream-offset", "last"); // 从最新开始// consumeArgs.put("x-stream-offset", 12345L); // 从指定偏移量开始// consumeArgs.put("x-stream-offset", timestamp); // 从指定时间开始
channel.basicConsume("event-stream", false, consumeArgs, consumer);| 特性 | Stream Queue | Kafka Partition |
|---|---|---|
| 存储 | 仅追加日志 | 仅追加日志 |
| 消费模式 | 偏移量消费 | 偏移量消费 |
| 保留策略 | 大小/时间 | 大小/时间 |
| 协议 | AMQP | 自定义协议 |
| 生态 | RabbitMQ 生态 | Kafka 生态 |
四、消息确认机制
4.1 Producer 确认
RabbitMQ 提供两种 Producer 确认机制:
| 机制 | 说明 | 可靠性 | 性能 |
|---|---|---|---|
| Transaction | 事务提交/回滚 | 高 | 低 |
| Publisher Confirm | 异步确认 | 高 | 高 |
// 方式一:事务(不推荐,性能差)channel.txSelect(); // 开启事务try { channel.basicPublish("order-exchange", "order.new", null, data.getBytes()); channel.txCommit(); // 提交} catch (Exception e) { channel.txRollback(); // 回滚}
// 方式二:Publisher Confirm(推荐)channel.confirmSelect(); // 开启确认模式
// 异步确认回调channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { // 消息确认成功 System.out.println("Message confirmed: " + deliveryTag); }
@Override public void handleNack(long deliveryTag, boolean multiple) { // 消息确认失败,需要重发 System.out.println("Message rejected: " + deliveryTag); resendMessage(deliveryTag); }});
// 发送消息channel.basicPublish("order-exchange", "order.new", null, data.getBytes());
// 批量确认channel.waitForConfirmsOrDie(5000); // 最多等待 5 秒4.2 Consumer 确认
Consumer 确认(ACK)告诉 RabbitMQ 消息已成功处理:
// 手动确认模式(推荐)boolean autoAck = false;channel.basicConsume("order-queue", autoAck, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { try { processOrder(body); // 处理成功,确认消息 channel.basicAck(envelope.getDeliveryTag(), false); } catch (Exception e) { // 处理失败,拒绝消息并重新入队 channel.basicNack(envelope.getDeliveryTag(), false, true); // 或拒绝不重新入队(进入死信队列) // channel.basicReject(envelope.getDeliveryTag(), false); } }});| 确认方式 | 方法 | 说明 |
|---|---|---|
| 确认 | basicAck | 消息处理成功 |
| 拒绝(单条) | basicReject | 拒绝单条消息,可选择重入队 |
| 拒绝(批量) | basicNack | 拒绝多条消息,可选择重入队 |
五、死信队列
5.1 死信的产生
消息变成”死信”(Dead Letter)的三种情况:
| 死信原因 | 触发条件 | 说明 |
|---|---|---|
| 消息被拒绝 | basicReject / basicNack 且 requeue=false | 消费者显式拒绝 |
| 消息过期 | TTL 到期 | 消息在队列中存活时间超过 TTL |
| 队列满 | 达到最大长度 | 最老的消息被丢弃 |
// 声明带死信的队列Map<String, Object> queueArgs = new HashMap<>();queueArgs.put("x-dead-letter-exchange", "dlx-exchange"); // 死信 ExchangequeueArgs.put("x-dead-letter-routing-key", "dead.order"); // 死信 routing keyqueueArgs.put("x-message-ttl", 60000); // 消息 TTL 60 秒queueArgs.put("x-max-length", 10000); // 队列最大长度
channel.queueDeclare("order-queue", true, false, false, queueArgs);
// 声明死信 Exchange 和队列channel.exchangeDeclare("dlx-exchange", "direct", true);channel.queueDeclare("dead-letter-queue", true, false, false, null);channel.queueBind("dead-letter-queue", "dlx-exchange", "dead.order");5.2 死信处理策略
// 死信处理器channel.basicConsume("dead-letter-queue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { // 获取死信原因 String reason = properties.getHeaders().get("x-death").toString(); int retryCount = getRetryCount(properties);
if (retryCount < 3) { // 重试:重新发送到业务 Exchange channel.basicPublish("order-exchange", "order.new", properties, body); } else { // 超过重试次数:记录到数据库并发送告警 saveToDatabase(body, reason); sendAlert("Dead letter exceeded retry limit", body); }
channel.basicAck(envelope.getDeliveryTag(), false); }});死信队列是消息系统的”安全网”——永远不要丢弃消息而不处理。通过死信队列,你可以:1)自动重试暂时失败的消息;2)对永久失败的消息进行人工干预;3)监控消息处理失败率,及时发现系统问题。
六、RabbitMQ 集群与高可用
6.1 集群架构
RabbitMQ 集群由多个节点组成,队列数据可以分布在不同的节点上:
| 节点类型 | 说明 | 适用场景 |
|---|---|---|
| Disc Node | 元数据 + 消息持久化到磁盘 | 生产环境(至少一个) |
| RAM Node | 元数据仅存内存 | 性能优先(不推荐生产使用) |
# 加入集群rabbitmqctl stop_apprabbitmqctl join_cluster rabbit@node1rabbitmqctl start_app
# 查看集群状态rabbitmqctl cluster_status
# 查看队列分布rabbitmqctl list_queues name pid owner_pid6.2 Quorum Queue 高可用
Quorum Queue 使用 Raft 协议实现自动故障转移:
# 声明 Quorum Queue(3 副本)# 通过策略设置rabbitmqctl set_policy ha-quorum ".*" '{"queue-type":"quorum"}' --apply-to queues
# 查看 Quorum Queue 状态rabbitmq-ctl list_queues name type quorum_status leader members
# 手动触发 Leader 切换rabbitmq-ctl force_leader_of_quorum_queue quorum-queue| 场景 | Classic Queue(镜像) | Quorum Queue |
|---|---|---|
| Leader 故障 | 需要手动/策略切换 | Raft 自动选举 |
| 数据一致性 | 最终一致 | 强一致(Raft) |
| 脑裂处理 | 可能丢失消息 | Raft 多数派保证 |
| 推荐度 | 已废弃 | 推荐 |
七、性能调优
7.1 Producer 优化
| 优化项 | 配置 | 效果 |
|---|---|---|
| 批量发送 | 循环内 publish,循环外 confirm | 3-10x 吞吐提升 |
| 消息持久化 | deliveryMode=2 | 可靠但慢 |
| 异步确认 | addConfirmListener | 不阻塞发送线程 |
| 预发布大小 | channel 限制 | 控制内存使用 |
// 批量发送优化channel.confirmSelect();int batchSize = 100;
for (int i = 0; i < 10000; i++) { channel.basicPublish("exchange", "routing.key", null, message.getBytes()); if ((i + 1) % batchSize == 0) { channel.waitForConfirmsOrDie(5000); // 每 100 条确认一次 }}7.2 Consumer 优化
| 优化项 | 配置 | 效果 |
|---|---|---|
| 预取数量 | basicQos(prefetchCount) | 平衡吞吐与公平性 |
| 手动确认 | autoAck=false | 可靠消费 |
| 批量确认 | multiple=true | 减少 ACK 网络开销 |
| 并发消费 | 多线程/多连接 | 提高消费速率 |
// 预取优化// prefetchCount = 1:严格轮询,公平但慢// prefetchCount = 10-100:平衡吞吐和公平// prefetchCount = 0:无限制,快但可能不均衡channel.basicQos(50); // 推荐 50-1007.3 队列与 Exchange 优化
| 优化项 | 配置 | 效果 |
|---|---|---|
| 惰性队列 | x-queue-mode=lazy | 减少内存占用 |
| 消息 TTL | x-message-ttl | 自动过期,减少存储 |
| 队列最大长度 | x-max-length | 防止队列无限增长 |
| 消息压缩 | 应用层压缩 | 减少网络和存储开销 |
// 惰性队列:消息直接写磁盘,减少内存占用Map<String, Object> lazyArgs = new HashMap<>();lazyArgs.put("x-queue-mode", "lazy");channel.queueDeclare("large-queue", true, false, false, lazyArgs);八、RabbitMQ vs Kafka 对比
| 维度 | RabbitMQ | Kafka |
|---|---|---|
| 模型 | Exchange → Queue → Consumer | Topic → Partition → Consumer Group |
| 路由 | 智能路由(4 种 Exchange) | 无路由(按 Partition 分配) |
| 消息保留 | 消费后删除 | 按时间/大小保留 |
| 消费模式 | 推模式(Push) | 拉模式(Pull) |
| 回放 | 不支持(消费后删除) | 支持(保留日志可重放) |
| 吞吐量 | 万级~十万级 TPS | 百万级 TPS |
| 延迟 | 微秒级 | 毫秒级 |
| 有序性 | 单队列有序 | 单分区有序 |
| 事务 | 支持 | 支持 |
| 适用场景 | 业务消息、灵活路由、RPC | 日志流、事件流、大数据 |
选择 RabbitMQ 还是 Kafka 不是”哪个更好”的问题,而是”场景匹配”的问题。RabbitMQ 擅长灵活路由、低延迟、消息确认;Kafka 擅长高吞吐、日志保留、流处理。很多生产系统两者并存——Kafka 处理事件流,RabbitMQ 处理业务消息。
九、总结
上一章理解了Kafka Streams 流处理。
| 维度 | 关键要点 |
|---|---|
| AMQP 模型 | Exchange(路由)+ Queue(缓冲)+ Binding(规则) |
| Exchange 类型 | direct(精确)、fanout(广播)、topic(模式)、headers(头匹配) |
| 队列类型 | Classic(通用)、Quorum(强一致)、Stream(高吞吐) |
| 消息确认 | Publisher Confirm + Consumer ACK = 端到端可靠 |
| 死信队列 | 消息拒绝/过期/溢出 → 死信 Exchange → 死信队列 → 人工处理 |
| 高可用 | Quorum Queue + Raft 自动故障转移 |
| 性能调优 | 批量发送、合理预取、惰性队列 |
RabbitMQ 的核心优势是灵活的路由模型和丰富的消息确认机制。如果你的系统需要”消息必须精确路由到特定消费者”或”每条消息都必须被确认”,RabbitMQ 是比 Kafka 更自然的选择。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






