mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2375 字
7 分钟
RabbitMQ:AMQP 与交换机
2026-04-06

每秒 1 万笔订单,每笔订单需要同时路由到库存服务(扣库存)、支付服务(记账)和通知服务(发短信)——三条路径,三种消费模式,还要保证没消费完的消息不丢失。Kafka 的 Topic 模型做不了这件事:一个 Topic 只有一种消费模式,要三种路由就得建三个 Topic,生产者发三次。RabbitMQ 的 Exchange 天生解决这个问题——生产者发一次消息,Exchange 根据 routing key 分发到不同的 Queue,每个 Queue 独立消费、独立确认。

一、AMQP 协议与模型#

1.1 AMQP 是什么?#

AMQP(Advanced Message Queuing Protocol)是一个开放标准的消息协议,定义了消息的格式、路由规则和传输方式。RabbitMQ 是 AMQP 0-9-1 协议的最成熟实现。

特性AMQP 0-9-1AMQP 1.0MQTTKafka 协议
定位消息队列协议通用消息协议IoT 轻量协议日志流协议
路由模型Exchange + QueueLink + SessionTopic 订阅Partition
事务支持支持不支持支持
流控Credit-basedFlow control简单无(靠拉取)
典型实现RabbitMQAzure Service BusMosquittoKafka

1.2 AMQP 模型核心概念#

AMQP 模型的核心是”智能路由”——消息由 Exchange 根据规则路由到 Queue,消费者从 Queue 获取消息:

graph LR P["Producer"] -->|发布消息| E["Exchange<br/>(交换机)"] E -->|routing key = order.new| Q1["Queue: order-new"] E -->|routing key = order.paid| Q2["Queue: order-paid"] E -->|routing key = order.#| Q3["Queue: order-all"] Q1 --> C1["Consumer A"] Q2 --> C2["Consumer B"] Q3 --> C3["Consumer C"]
概念说明类比
Exchange消息路由器,根据规则分发消息邮局分拣中心
Queue消息缓冲区,存储待消费的消息收件箱
BindingExchange 与 Queue 的绑定规则分拣规则
Routing Key消息的路由键信封上的地址
Virtual Host虚拟主机,隔离 Exchange/Queue/权限独立邮局
Note

AMQP 与 Kafka 的根本区别在于路由模型:Kafka 的 Topic 是”分区日志”,消费者主动拉取;RabbitMQ 的 Exchange 是”智能路由”,消息被推送到匹配的 Queue。Kafka 适合高吞吐的流式场景,RabbitMQ 适合灵活路由的业务消息场景。

二、Exchange 类型详解#

2.1 四种 Exchange 类型#

RabbitMQ 提供四种内置 Exchange 类型,每种有不同的路由规则:

Exchange 类型路由规则适用场景性能
directrouting key 精确匹配点对点、任务分发最高
fanout广播到所有绑定队列广播通知、日志分发
topicrouting key 模式匹配发布订阅、多维度路由
headers消息头属性匹配复杂条件路由
// 声明 Exchange
channel.exchangeDeclare("order-direct", "direct", true); // 持久化
channel.exchangeDeclare("log-fanout", "fanout", true);
channel.exchangeDeclare("event-topic", "topic", true);
channel.exchangeDeclare="route-headers", "headers", true);

2.2 Direct Exchange#

Direct Exchange 根据 routing key 精确匹配 Binding Key:

graph LR P["Producer"] --> E["Direct Exchange<br/>order-exchange"] E -->|"order.new"| Q1["Queue: new-orders"] E -->|"order.paid"| Q2["Queue: paid-orders"] E -->|"order.cancel"| Q3["Queue: cancel-orders"]
// 声明 Direct Exchange 和绑定
channel.exchangeDeclare("order-exchange", "direct", true);
// 声明队列
channel.queueDeclare("new-orders", true, false, false, null);
channel.queueDeclare("paid-orders", true, false, false, null);
channel.queueDeclare("cancel-orders", true, false, false, null);
// 绑定:routing key 精确匹配
channel.queueBind("new-orders", "order-exchange", "order.new");
channel.queueBind("paid-orders", "order-exchange", "order.paid");
channel.queueBind("cancel-orders", "order-exchange", "order.cancel");
// 发送消息
channel.basicPublish("order-exchange", "order.new", null, orderJson.getBytes());

2.3 Fanout Exchange#

Fanout Exchange 忽略 routing key,将消息广播到所有绑定的队列:

// 声明 Fanout Exchange
channel.exchangeDeclare("log-fanout", "fanout", true);
// 多个队列绑定到同一个 Fanout Exchange
channel.queueDeclare("log-file", true, false, false, null);
channel.queueDeclare("log-elasticsearch", true, false, false, null);
channel.queueDeclare("log-monitor", true, false, false, null);
// 绑定:无需 routing key
channel.queueBind("log-file", "log-fanout", "");
channel.queueBind("log-elasticsearch", "log-fanout", "");
channel.queueBind("log-monitor", "log-fanout", "");
// 发送消息——所有队列都会收到
channel.basicPublish("log-fanout", "", null, logData.getBytes());
特性说明
routing key被忽略
绑定数量不限,所有绑定队列都收到消息
典型场景日志分发、事件广播、配置更新通知

2.4 Topic Exchange#

Topic Exchange 支持 routing key 的模式匹配,使用通配符:

通配符含义示例
*匹配一个单词order.* 匹配 order.new,不匹配 order.new.vip
#匹配零或多个单词order.# 匹配 order.neworder.new.vip
// 声明 Topic Exchange
channel.exchangeDeclare("event-topic", "topic", true);
// 绑定:不同消费者订阅不同模式
channel.queueBind("all-orders", "event-topic", "order.#"); // 所有订单事件
channel.queueBind("new-orders", "event-topic", "order.new"); // 仅新订单
channel.queueBind("vip-orders", "event-topic", "order.*.vip"); // VIP 订单
channel.queueBind("all-events", "event-topic", "#"); // 所有事件
// 发送消息
channel.basicPublish("event-topic", "order.new.vip", null, eventJson.getBytes());
// 匹配:all-orders(order.#)、vip-orders(order.*.vip)、all-events(#)
graph LR P["Producer"] --> E["Topic Exchange<br/>event-topic"] E -->|"order.new.vip"| Q1["all-orders<br/>order.#"] E -->|"order.new.vip"| Q2["vip-orders<br/>order.*.vip"] E -->|"order.new.vip"| Q3["all-events<br/>#"] E -.-x|"不匹配"| Q4["new-orders<br/>order.new"]

2.5 Headers Exchange#

Headers Exchange 根据消息头属性匹配,适用于复杂条件路由:

// 声明 Headers Exchange
channel.exchangeDeclare("route-headers", "headers", true);
// 绑定:匹配条件
Map<String, Object> bindingArgs1 = new HashMap<>();
bindingArgs1.put("x-match", "all"); // 所有头都匹配
bindingArgs1.put("format", "pdf");
bindingArgs1.put("type", "report");
Map<String, Object> bindingArgs2 = new HashMap<>();
bindingArgs2.put("x-match", "any"); // 任一头匹配
bindingArgs2.put("format", "pdf");
bindingArgs2.put("format", "csv");
channel.queueBind("pdf-reports", "route-headers", "", bindingArgs1);
channel.queueBind("data-files", "route-headers", "", bindingArgs2);
// 发送消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("format", "pdf", "type", "report"))
.build();
channel.basicPublish("route-headers", "", props, data.getBytes());
x-match 模式含义性能
all所有头属性都匹配较低
any任一头属性匹配较低
Warning

Headers Exchange 的匹配性能低于其他 Exchange 类型,因为需要解析和比较消息头。除非路由逻辑确实需要基于多属性匹配,否则优先使用 direct 或 topic Exchange。

三、队列类型与特性#

3.1 队列类型#

RabbitMQ 3.12+ 提供三种队列类型:

队列类型存储持久化性能适用场景
Classic Queue内存 + 磁盘可选通用场景
Quorum Queue多节点 Raft 日志始终持久化高可靠场景
Stream Queue仅追加日志始终持久化高吞吐、消费回放
// 声明 Classic Queue
channel.queueDeclare("classic-queue", true, false, false, null);
// 声明 Quorum Queue(通过参数指定)
Map<String, Object> quorumArgs = new HashMap<>();
quorumArgs.put("x-queue-type", "quorum");
quorumArgs.put("x-delivery-limit", 3); // 投递次数限制
channel.queueDeclare("quorum-queue", true, false, false, quorumArgs);
// 声明 Stream Queue
Map<String, Object> streamArgs = new HashMap<>();
streamArgs.put("x-queue-type", "stream");
streamArgs.put("x-max-length-bytes", 1073741824L); // 1GB
streamArgs.put("x-stream-max-segment-size-bytes", 67108864L); // 64MB
channel.queueDeclare("event-stream", true, false, false, streamArgs);

3.2 Quorum Queue 详解#

Quorum Queue 是 RabbitMQ 的强一致性队列,使用 Raft 协议实现副本:

graph TB subgraph "Quorum Queue (Raft Group)" L["Leader<br/>Node 1"] F1["Follower<br/>Node 2"] F2["Follower<br/>Node 3"] end P["Producer"] -->|写入| L L -->|复制| F1 L -->|复制| F2 F1 -->|ACK| L F2 -->|ACK| L L -->|确认| P C["Consumer"] -->|消费| L
特性Classic QueueQuorum Queue
副本可选(镜像队列,已废弃)始终多副本(Raft)
一致性最终一致强一致
消息持久化可选始终持久化
投递限制x-delivery-limit
死信支持支持
性能中(Raft 共识开销)
# 查看 Quorum Queue 状态
rabbitmq-ctl list_queues name type quorum_status leader members
# 输出示例:
# quorum-queue quorum running rabbit@node1 [rabbit@node1, rabbit@node2, rabbit@node3]

3.3 Stream Queue 详解#

Stream Queue 是 RabbitMQ 3.9+ 引入的仅追加日志队列,类似 Kafka 的分区日志:

// Stream 消费:支持偏移量消费
channel.basicQos(100); // 预取
// 从指定偏移量开始消费
Map<String, Object> consumeArgs = new HashMap<>();
consumeArgs.put("x-stream-offset", "first"); // 从第一条开始
// consumeArgs.put("x-stream-offset", "last"); // 从最新开始
// consumeArgs.put("x-stream-offset", 12345L); // 从指定偏移量开始
// consumeArgs.put("x-stream-offset", timestamp); // 从指定时间开始
channel.basicConsume("event-stream", false, consumeArgs, consumer);
特性Stream QueueKafka Partition
存储仅追加日志仅追加日志
消费模式偏移量消费偏移量消费
保留策略大小/时间大小/时间
协议AMQP自定义协议
生态RabbitMQ 生态Kafka 生态

四、消息确认机制#

4.1 Producer 确认#

RabbitMQ 提供两种 Producer 确认机制:

机制说明可靠性性能
Transaction事务提交/回滚
Publisher Confirm异步确认
// 方式一:事务(不推荐,性能差)
channel.txSelect(); // 开启事务
try {
channel.basicPublish("order-exchange", "order.new", null, data.getBytes());
channel.txCommit(); // 提交
} catch (Exception e) {
channel.txRollback(); // 回滚
}
// 方式二:Publisher Confirm(推荐)
channel.confirmSelect(); // 开启确认模式
// 异步确认回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// 消息确认成功
System.out.println("Message confirmed: " + deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// 消息确认失败,需要重发
System.out.println("Message rejected: " + deliveryTag);
resendMessage(deliveryTag);
}
});
// 发送消息
channel.basicPublish("order-exchange", "order.new", null, data.getBytes());
// 批量确认
channel.waitForConfirmsOrDie(5000); // 最多等待 5 秒

4.2 Consumer 确认#

Consumer 确认(ACK)告诉 RabbitMQ 消息已成功处理:

// 手动确认模式(推荐)
boolean autoAck = false;
channel.basicConsume("order-queue", autoAck, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
try {
processOrder(body);
// 处理成功,确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
// 处理失败,拒绝消息并重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
// 或拒绝不重新入队(进入死信队列)
// channel.basicReject(envelope.getDeliveryTag(), false);
}
}
});
确认方式方法说明
确认basicAck消息处理成功
拒绝(单条)basicReject拒绝单条消息,可选择重入队
拒绝(批量)basicNack拒绝多条消息,可选择重入队
sequenceDiagram participant P as Producer participant R as RabbitMQ participant C as Consumer P->>R: 发布消息 R-->>P: Publisher Confirm (ACK) R->>C: 投递消息 alt 处理成功 C->>R: basicAck R->>R: 删除消息 else 处理失败 C->>R: basicNack(requeue=true) R->>C: 重新投递 else 处理失败(不重试) C->>R: basicReject(requeue=false) R->>R: 转入死信队列 end

五、死信队列#

5.1 死信的产生#

消息变成”死信”(Dead Letter)的三种情况:

死信原因触发条件说明
消息被拒绝basicReject / basicNackrequeue=false消费者显式拒绝
消息过期TTL 到期消息在队列中存活时间超过 TTL
队列满达到最大长度最老的消息被丢弃
// 声明带死信的队列
Map<String, Object> queueArgs = new HashMap<>();
queueArgs.put("x-dead-letter-exchange", "dlx-exchange"); // 死信 Exchange
queueArgs.put("x-dead-letter-routing-key", "dead.order"); // 死信 routing key
queueArgs.put("x-message-ttl", 60000); // 消息 TTL 60 秒
queueArgs.put("x-max-length", 10000); // 队列最大长度
channel.queueDeclare("order-queue", true, false, false, queueArgs);
// 声明死信 Exchange 和队列
channel.exchangeDeclare("dlx-exchange", "direct", true);
channel.queueDeclare("dead-letter-queue", true, false, false, null);
channel.queueBind("dead-letter-queue", "dlx-exchange", "dead.order");

5.2 死信处理策略#

graph TB Q["业务队列<br/>order-queue"] -->|"消息过期/拒绝/溢出"| DLX["死信 Exchange<br/>dlx-exchange"] DLX --> DLQ["死信队列<br/>dead-letter-queue"] DLQ --> HANDLER["死信处理器"] HANDLER -->|"重试"| RETRY["重试 Exchange"] RETRY --> Q HANDLER -->|"告警"| ALERT["告警系统"] HANDLER -->|"记录"| DB["数据库"]
// 死信处理器
channel.basicConsume("dead-letter-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
// 获取死信原因
String reason = properties.getHeaders().get("x-death").toString();
int retryCount = getRetryCount(properties);
if (retryCount < 3) {
// 重试:重新发送到业务 Exchange
channel.basicPublish("order-exchange", "order.new",
properties, body);
} else {
// 超过重试次数:记录到数据库并发送告警
saveToDatabase(body, reason);
sendAlert("Dead letter exceeded retry limit", body);
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
Tip

死信队列是消息系统的”安全网”——永远不要丢弃消息而不处理。通过死信队列,你可以:1)自动重试暂时失败的消息;2)对永久失败的消息进行人工干预;3)监控消息处理失败率,及时发现系统问题。

六、RabbitMQ 集群与高可用#

6.1 集群架构#

RabbitMQ 集群由多个节点组成,队列数据可以分布在不同的节点上:

graph TB subgraph "RabbitMQ 集群" N1["Node 1<br/>Disc + RAM"] N2["Node 2<br/>Disc + RAM"] N3["Node 3<br/>Disc + RAM"] end N1 <-->|"Erlang Cookie"| N2 N2 <-->|"Erlang Cookie"| N3 N1 <-->|"Erlang Cookie"| N3 LB["Load Balancer<br/>(HAProxy/Nginx)"] --> N1 LB --> N2 LB --> N3 P["Producer"] --> LB C["Consumer"] --> LB
节点类型说明适用场景
Disc Node元数据 + 消息持久化到磁盘生产环境(至少一个)
RAM Node元数据仅存内存性能优先(不推荐生产使用)
# 加入集群
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app
# 查看集群状态
rabbitmqctl cluster_status
# 查看队列分布
rabbitmqctl list_queues name pid owner_pid

6.2 Quorum Queue 高可用#

Quorum Queue 使用 Raft 协议实现自动故障转移:

# 声明 Quorum Queue(3 副本)
# 通过策略设置
rabbitmqctl set_policy ha-quorum ".*" '{"queue-type":"quorum"}' --apply-to queues
# 查看 Quorum Queue 状态
rabbitmq-ctl list_queues name type quorum_status leader members
# 手动触发 Leader 切换
rabbitmq-ctl force_leader_of_quorum_queue quorum-queue
场景Classic Queue(镜像)Quorum Queue
Leader 故障需要手动/策略切换Raft 自动选举
数据一致性最终一致强一致(Raft)
脑裂处理可能丢失消息Raft 多数派保证
推荐度已废弃推荐

七、性能调优#

7.1 Producer 优化#

优化项配置效果
批量发送循环内 publish,循环外 confirm3-10x 吞吐提升
消息持久化deliveryMode=2可靠但慢
异步确认addConfirmListener不阻塞发送线程
预发布大小channel 限制控制内存使用
// 批量发送优化
channel.confirmSelect();
int batchSize = 100;
for (int i = 0; i < 10000; i++) {
channel.basicPublish("exchange", "routing.key", null, message.getBytes());
if ((i + 1) % batchSize == 0) {
channel.waitForConfirmsOrDie(5000); // 每 100 条确认一次
}
}

7.2 Consumer 优化#

优化项配置效果
预取数量basicQos(prefetchCount)平衡吞吐与公平性
手动确认autoAck=false可靠消费
批量确认multiple=true减少 ACK 网络开销
并发消费多线程/多连接提高消费速率
// 预取优化
// prefetchCount = 1:严格轮询,公平但慢
// prefetchCount = 10-100:平衡吞吐和公平
// prefetchCount = 0:无限制,快但可能不均衡
channel.basicQos(50); // 推荐 50-100

7.3 队列与 Exchange 优化#

优化项配置效果
惰性队列x-queue-mode=lazy减少内存占用
消息 TTLx-message-ttl自动过期,减少存储
队列最大长度x-max-length防止队列无限增长
消息压缩应用层压缩减少网络和存储开销
// 惰性队列:消息直接写磁盘,减少内存占用
Map<String, Object> lazyArgs = new HashMap<>();
lazyArgs.put("x-queue-mode", "lazy");
channel.queueDeclare("large-queue", true, false, false, lazyArgs);

八、RabbitMQ vs Kafka 对比#

维度RabbitMQKafka
模型Exchange → Queue → ConsumerTopic → Partition → Consumer Group
路由智能路由(4 种 Exchange)无路由(按 Partition 分配)
消息保留消费后删除按时间/大小保留
消费模式推模式(Push)拉模式(Pull)
回放不支持(消费后删除)支持(保留日志可重放)
吞吐量万级~十万级 TPS百万级 TPS
延迟微秒级毫秒级
有序性单队列有序单分区有序
事务支持支持
适用场景业务消息、灵活路由、RPC日志流、事件流、大数据
Note

选择 RabbitMQ 还是 Kafka 不是”哪个更好”的问题,而是”场景匹配”的问题。RabbitMQ 擅长灵活路由、低延迟、消息确认;Kafka 擅长高吞吐、日志保留、流处理。很多生产系统两者并存——Kafka 处理事件流,RabbitMQ 处理业务消息。

九、总结#

上一章理解了Kafka Streams 流处理。

维度关键要点
AMQP 模型Exchange(路由)+ Queue(缓冲)+ Binding(规则)
Exchange 类型direct(精确)、fanout(广播)、topic(模式)、headers(头匹配)
队列类型Classic(通用)、Quorum(强一致)、Stream(高吞吐)
消息确认Publisher Confirm + Consumer ACK = 端到端可靠
死信队列消息拒绝/过期/溢出 → 死信 Exchange → 死信队列 → 人工处理
高可用Quorum Queue + Raft 自动故障转移
性能调优批量发送、合理预取、惰性队列
Tip

RabbitMQ 的核心优势是灵活的路由模型和丰富的消息确认机制。如果你的系统需要”消息必须精确路由到特定消费者”或”每条消息都必须被确认”,RabbitMQ 是比 Kafka 更自然的选择。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

RabbitMQ:AMQP 与交换机
https://blog.souloss.com/posts/messaging/rabbitmq/
作者
Souloss
发布于
2026-04-06
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时