1844 字
5 分钟
消息积压与反压
凌晨 3 点告警响起——Kafka 消费者 Lag 从 0 飙到 5000 万。生产者还在以每秒 5 万条的速度写入,消费者每秒只能处理 1 万条。差距每秒扩大 4 万条,磁盘空间还能撑 6 小时。这不是假设——这是大促后最常见的线上事故。消息积压的本质是生产速度持续大于消费速度,而解决积压的关键不是”加快消费”这么简单:是加消费者还是优化消费逻辑?是扩分区还是临时丢弃非关键消息?是反压生产者还是接受延迟?每个选择都有代价。
一、消息积压的根因分析
1.1 积压的本质
消息积压的本质是生产速度 > 消费速度的持续状态。短期的速度不匹配是正常的,但长期不匹配会导致积压:
graph LR
subgraph "正常状态"
P1["Producer<br/>1000 msg/s"] --> Q1["Queue<br/>Lag ≈ 0"]
Q1 --> C1["Consumer<br/>1000 msg/s"]
end
subgraph "积压状态"
P2["Producer<br/>10000 msg/s"] --> Q2["Queue<br/>Lag ↑↑↑"]
Q2 --> C2["Consumer<br/>1000 msg/s"]
end
| 积压阶段 | Lag 范围 | 影响 | 紧急度 |
|---|---|---|---|
| 轻微 | 分钟级 | 消费延迟增加 | 低 |
| 中度 | 小时级 | 业务感知延迟 | 中 |
| 严重 | 天级 | 数据过期、磁盘告警 | 高 |
| 灾难 | 超出保留期 | 消息丢失 | 紧急 |
1.2 积压的五大根因
| 根因 | 说明 | 典型场景 |
|---|---|---|
| 消费端故障 | Consumer 宕机或 OOM | 大促时消费者崩溃 |
| 消费逻辑慢 | DB 慢查询、外部 API 超时 | 下游数据库性能下降 |
| 生产突增 | 流量洪峰超出容量 | 双十一秒杀 |
| 分区不均衡 | 某些分区消息过多 | Key 分布不均匀 |
| Rebalance 风暴 | 频繁 Rebalance 导致消费暂停 | 消费者配置不当 |
graph TB
LAG["消息积压"] --> C1["消费端故障"]
LAG --> C2["消费逻辑慢"]
LAG --> C3["生产突增"]
LAG --> C4["分区不均衡"]
LAG --> C5["Rebalance 风暴"]
C2 --> C2A["DB 慢查询"]
C2 --> C2B["外部 API 超时"]
C2 --> C2C["GC 停顿"]
C3 --> C3A["大促流量"]
C3 --> C3B["批量导入"]
C3 --> C3C["重放历史数据"]
C5 --> C5A["session.timeout 过短"]
C5 --> C5B["消费者频繁上下线"]
C5 --> C5C["心跳延迟"]
1.3 积压监控指标
| 指标 | Kafka | RabbitMQ | RocketMQ | 告警阈值 |
|---|---|---|---|---|
| 消费延迟 | Consumer Lag | Queue Depth | Consumer Offset Diff | > 10万条 |
| 消费速率 | bytes-consumed-rate | message_rates.deliver | tps | 下降 50% |
| 生产速率 | bytes-produced-rate | message_rates.publish | tps | 突增 3x |
| 消费者状态 | Consumer State | Consumer Count | Consumer Status | 消费者掉线 |
| 磁盘使用 | Disk Usage | Disk Free | Disk Usage | > 80% |
# Kafka:查看 Consumer Lagkafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group order-consumer-group
# 输出:# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG# orders 0 12345 56789 44444# orders 1 23456 67890 44434
# RabbitMQ:查看队列深度rabbitmqctl list_queues name messages consumers message_rates
# RocketMQ:查看消费进度./mqadmin consumerProgress -g order-consumer-groupNote
监控 Lag 时,不要只看绝对值,更要看趋势。Lag 持续增长说明消费速度跟不上生产速度;Lag 稳定说明虽然延迟但能追上;Lag 下降说明正在恢复。
二、消费速度优化
2.1 增加消费者
最直接的方案是增加消费者实例:
graph TB
subgraph "优化前:2 消费者"
Q1["Topic (4 partitions)"] --> C1["Consumer 1<br/>2 partitions"]
Q1 --> C2["Consumer 2<br/>2 partitions"]
Note1["总消费速度: 2000 msg/s"]
end
subgraph "优化后:4 消费者"
Q2["Topic (4 partitions)"] --> C3["Consumer 1<br/>1 partition"]
Q2 --> C4["Consumer 2<br/>1 partition"]
Q2 --> C5["Consumer 3<br/>1 partition"]
Q2 --> C6["Consumer 4<br/>1 partition"]
Note2["总消费速度: 4000 msg/s"]
end
| 系统限制 | 说明 | 解决方案 |
|---|---|---|
| Kafka | 消费者数 ≤ 分区数 | 增加分区数 |
| RabbitMQ | 消费者数无限制 | 直接增加消费者 |
| RocketMQ | 消费者数 ≤ 队列数 | 增加队列数 |
# Kafka:增加分区(注意:会影响 Key 路由)kafka-topics --bootstrap-server localhost:9092 \ --alter --topic orders --partitions 16
# RocketMQ:增加队列./mqadmin updateTopic -n localhost:9876 -t TopicOrder -c DefaultCluster -r 162.2 提升单消费者吞吐
| 优化项 | Kafka 配置 | 效果 |
|---|---|---|
| 批量消费 | max.poll.records=500 | 减少网络往返 |
| 预取 | fetch.min.bytes=1MB | 减少请求次数 |
| 并发处理 | 多线程处理拉取到的记录 | 提高处理并行度 |
| 异步提交 | enable.auto.commit=true | 减少 commit 开销 |
// Kafka:多线程消费优化KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(List.of("orders"));
ExecutorService executor = Executors.newFixedThreadPool(8);
while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); // 按 Partition 分组,每个 Partition 内保持有序 Map<TopicPartition, List<ConsumerRecord<String, String>>> partitioned = records.partitions().stream() .collect(Collectors.toMap( tp -> tp, tp -> records.records(tp) ));
List<Future<?>> futures = new ArrayList<>(); for (var entry : partitioned.entrySet()) { futures.add(executor.submit(() -> { for (var record : entry.getValue()) { processRecord(record); } })); }
// 等待所有线程完成 for (Future<?> f : futures) f.get(); consumer.commitSync();}2.3 跳过非关键消息
在极端积压场景下,可以跳过部分非关键消息:
// 策略一:只消费最新消息(跳过积压)props.put("auto.offset.reset", "latest"); // 从最新开始消费
// 策略二:重置 Offset 到最近位置kafka-consumer-groups --bootstrap-server localhost:9092 \ --group order-consumer-group \ --reset-offsets --to-latest --execute --topic orders
// 策略三:按时间重置(跳过超过保留期的消息)kafka-consumer-groups --bootstrap-server localhost:9092 \ --group order-consumer-group \ --reset-offsets --to-datetime 2026-10-12T00:00:00 --execute --topic orders| 策略 | 数据损失 | 恢复速度 | 适用场景 |
|---|---|---|---|
| 从最新消费 | 丢失所有积压消息 | 即时 | 日志、监控数据 |
| 按时间重置 | 丢失指定时间前的消息 | 快 | 可容忍部分丢失 |
| 增加消费者 | 无 | 中 | 首选方案 |
| 临时消费者 | 无 | 快 | 需要额外资源 |
三、反压机制
3.1 什么是反压?
反压(Backpressure)是当消费者处理不过来时,向生产者传递”减速”信号,防止系统过载:
sequenceDiagram
participant P as Producer
participant Q as Queue
participant C as Consumer
Note over P,C: 正常状态
P->>Q: 1000 msg/s
Q->>C: 1000 msg/s
Note over C: 消费变慢!
C->>Q: 只能处理 500 msg/s
Q->>Q: 积压增加
Note over Q,C: 反压触发
Q->>P: 反压信号(减速)
P->>Q: 500 msg/s(降速)
Q->>C: 500 msg/s
Note over Q: 积压不再增长
3.2 各系统的反压实现
| 系统 | 反压机制 | 触发条件 | 效果 |
|---|---|---|---|
| Kafka | 无原生反压 | — | Consumer 自行控制拉取速率 |
| RabbitMQ | Credit-based 流控 | 内存/磁盘阈值 | 阻塞 Connection |
| RocketMQ | 拉取式消费 | Consumer 处理能力 | 自动调节拉取频率 |
| Pulsar | Consumer 流控 | Permit 机制 | 精确控制投递速率 |
// Kafka:Consumer 端流控// 通过控制 poll 频率和 max.poll.records 实现反压props.put("max.poll.records", 100); // 每次 poll 最多 100 条props.put("max.poll.interval.ms", 300000); // 5 分钟内必须处理完
// 如果处理慢,自动降低 poll 频率while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.count() > 0) { processRecords(records); // 处理慢 → 下次 poll 延迟 }}// RabbitMQ:Consumer 端流控channel.basicQos(100); // 预取 100 条
channel.basicConsume("order-queue", false, new DefaultConsumer(channel) { @Override public void handleDelivery(String tag, Envelope envelope, AMQP.BasicProperties props, byte[] body) { processMessage(body); // 手动确认后,RabbitMQ 才会投递新消息 channel.basicAck(envelope.getDeliveryTag(), false); }});
// 如果处理慢,不 ACK → RabbitMQ 不再投递新消息 → 自动反压// Pulsar:Consumer 端流控Consumer<byte[]> consumer = client.newConsumer() .topic("orders") .subscriptionName("order-processor") .receiverQueueSize(100) // 接收队列大小 .subscribe();
// Pulsar 使用 permit 机制:Consumer 消费一条消息后发放一个 permit// Broker 只在有 permit 时才投递消息3.3 Producer 端反压
当消息系统本身无法承受写入速度时,需要对 Producer 施加反压:
// Kafka Producer:缓冲区满时阻塞props.put("buffer.memory", 67108864); // 64MB 缓冲区props.put("max.block.ms", 60000); // 缓冲区满时最多阻塞 60 秒
try { producer.send(record, (metadata, exception) -> { if (exception != null) { log.error("Send failed", exception); } });} catch (BufferExhaustedException e) { // 缓冲区满,需要降速或降级 log.warn("Producer buffer full, applying backpressure");}| 策略 | 说明 | 数据损失 | 适用场景 |
|---|---|---|---|
| 阻塞等待 | 缓冲区满时阻塞 | 无 | 可容忍延迟 |
| 降速 | 降低发送速率 | 无 | 流量控制 |
| 降级 | 丢弃低优先级消息 | 部分 | 有优先级区分 |
| 转储 | 写入本地文件/DB | 无 | 临时缓冲 |
四、死信队列
4.1 死信的产生与处理
当消息消费失败超过重试次数后,进入死信队列:
graph LR
Q["业务队列"] -->|"消费失败"| RQ["重试队列<br/>(延迟递增)"]
RQ -->|"重试成功"| Q
RQ -->|"超过重试次数"| DLQ["死信队列<br/>(DLQ)"]
DLQ --> HANDLER["死信处理器"]
HANDLER -->|"人工修复"| Q
HANDLER -->|"告警"| ALERT["告警系统"]
HANDLER -->|"归档"| ARCHIVE["归档存储"]
// Kafka:手动实现死信队列while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (var record : records) { int retryCount = getRetryCount(record); try { processRecord(record); } catch (Exception e) { if (retryCount >= MAX_RETRIES) { // 发送到死信 Topic ProducerRecord<String, String> dlqRecord = new ProducerRecord<>( "orders.DLQ", record.key(), record.value() ); dlqProducer.send(dlqRecord); log.error("Message sent to DLQ after {} retries: {}", retryCount, record.value()); } else { // 发送到重试 Topic sendToRetryTopic(record, retryCount + 1); } } } consumer.commitSync();}4.2 各系统的死信实现
| 系统 | 死信机制 | 配置 |
|---|---|---|
| Kafka | 无内置,需手动实现 | 自定义 DLQ Topic |
| RabbitMQ | 原生死信 Exchange | x-dead-letter-exchange |
| RocketMQ | 内置死信队列 | %DLQ%ConsumerGroup |
| Pulsar | 重试 Letter Topic | retryLetterTopic |
# RabbitMQ:查看死信队列rabbitmqctl list_queues name messages arguments | grep dlx
# RocketMQ:查看死信队列./mqadmin examineTopicStats -t %DLQ%order-consumer-group
# Kafka:消费死信 Topickafka-console-consumer --bootstrap-server localhost:9092 --topic orders.DLQWarning
死信队列不是”垃圾桶”——每条死信消息都代表一个业务异常。务必建立死信监控和告警机制,定期分析死信原因并修复。如果死信率持续上升,说明系统存在根本性问题。
五、降级策略
5.1 多级降级方案
当积压无法快速消除时,需要降级以保证核心业务:
graph TD
LAG["消息积压"] --> LEVEL1["Level 1: 扩容<br/>增加消费者"]
LEVEL1 -->|"不够"| LEVEL2["Level 2: 优化<br/>提升消费速度"]
LEVEL2 -->|"不够"| LEVEL3["Level 3: 降级<br/>跳过非关键消息"]
LEVEL3 -->|"不够"| LEVEL4["Level 4: 转储<br/>写入备用存储"]
LEVEL4 -->|"不够"| LEVEL5["Level 5: 限流<br/>限制生产速率"]
| 降级级别 | 策略 | 影响 | 恢复难度 |
|---|---|---|---|
| Level 1 | 增加消费者实例 | 无 | 低(缩容即可) |
| Level 2 | 优化消费逻辑 | 无 | 低 |
| Level 3 | 跳过非关键消息 | 部分数据丢失 | 中 |
| Level 4 | 转储到备用存储 | 延迟处理 | 高(需回填) |
| Level 5 | 限制生产速率 | 业务受限 | 低 |
5.2 临时消费者模式
// 临时消费者:快速消费积压消息,不做业务处理,只转储public class EmergencyConsumer { private final KafkaConsumer<String, String> consumer; private final Producer<File> fileProducer; // 写入本地文件
public void startEmergencyConsume() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (var record : records) { // 转储到文件,后续回填 fileProducer.write(record.key(), record.value(), record.timestamp()); } consumer.commitSync();
if (getLag() < THRESHOLD) { log.info("Lag recovered, stopping emergency consumer"); break; } } }}5.3 优先级消费
// 按消息优先级消费public class PriorityConsumer { public void consumeWithPriority() { // 先消费高优先级 Topic consumeTopic("orders-high-priority"); // 再消费普通 Topic consumeTopic("orders-normal"); // 最后消费低优先级 Topic consumeTopic("orders-low-priority"); }
private void consumeTopic(String topic) { // 消费到 Lag < 阈值后切换 while (getLag(topic) > LAG_THRESHOLD) { var records = consumer.poll(Duration.ofMillis(100)); processRecords(records); } }}六、预防积压的架构设计
6.1 容量规划
| 指标 | 计算公式 | 示例 |
|---|---|---|
| 峰值生产速率 | 日均 × 峰值倍数 | 1000 × 10 = 10000 msg/s |
| 所需消费者数 | 峰值速率 / 单消费者速率 | 10000 / 2000 = 5 |
| 安全余量 | 所需消费者 × 1.5 | 5 × 1.5 = 8 |
| 磁盘容量 | 峰值速率 × 保留时间 × 消息大小 | 10000 × 86400 × 1KB = 864GB/天 |
6.2 自动伸缩
# Kubernetes HPA:基于 Consumer Lag 自动伸缩apiVersion: autoscaling/v2kind: HorizontalPodAutoscalermetadata: name: order-consumer-hpaspec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: order-consumer minReplicas: 2 maxReplicas: 20 metrics: - type: External external: metric: name: kafka_consumer_lag target: type: AverageValue averageValue: "10000"6.3 熔断与限流
// Producer 端限流public class RateLimitedProducer { private final RateLimiter rateLimiter = RateLimiter.create(5000); // 5000 msg/s
public void send(String topic, String key, byte[] value) { if (!rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) { // 限流:丢弃或缓冲 log.warn("Rate limited, message buffered"); bufferMessage(topic, key, value); return; } producer.send(new ProducerRecord<>(topic, key, value)); }}七、各系统积压处理对比
| 维度 | Kafka | RabbitMQ | RocketMQ | Pulsar |
|---|---|---|---|---|
| 积压指标 | Consumer Lag | Queue Depth | Offset Diff | Backlog Size |
| 增加消费者 | ≤ 分区数 | 无限制 | ≤ 队列数 | 无限制 |
| 反压机制 | Consumer 自控 | Credit-based | 拉取式 | Permit |
| 死信队列 | 手动实现 | 原生支持 | 内置 | 重试 Letter |
| 消息跳过 | Offset 重置 | 丢弃策略 | 跳过非活跃 | Subscription reset |
| 分区扩容 | 支持(影响路由) | N/A | 支持 | 支持 |
Tip
消息积压的处理原则:1)监控先行——Lag 告警是第一道防线;2)快速扩容——消费者实例的弹性伸缩是首选方案;3)降级有序——从扩容到优化到跳过,逐步升级;4)根因分析——积压是症状,找到根因才能根治。
八、总结
上一章从全景视角介绍了消息有序性。
| 维度 | 关键要点 |
|---|---|
| 根因分析 | 消费端故障、逻辑慢、生产突增、分区不均、Rebalance 风暴 |
| 消费优化 | 增加消费者、提升单消费者吞吐、跳过非关键消息 |
| 反压机制 | Consumer 端流控(QoS/permit)、Producer 端限流 |
| 死信队列 | 重试失败后进入 DLQ,必须监控和告警 |
| 降级策略 | 扩容 → 优化 → 跳过 → 转储 → 限流 |
| 预防设计 | 容量规划、自动伸缩、熔断限流 |
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时
相关文章 智能推荐
1
消息与数据库一致性
消息队列与事件流 深入消息与数据库一致性——双写问题、本地消息表、事务性发件箱、CDC 同步。
2
RocketMQ:事务消息与延迟消息
消息队列与事件流 深入 RocketMQ——NameServer/Broker 架构、事务消息(两阶段提交)、延迟消息、顺序消息与消息过滤。
3
RabbitMQ:AMQP 与交换机
消息队列与事件流 深入 RabbitMQ——AMQP 模型、Exchange 类型、队列类型、消息确认、死信队列,理解 RabbitMQ 的智能路由与灵活消息模型。
4
消息队列选型
消息队列与事件流 深入消息队列选型——Kafka/RabbitMQ/RocketMQ/Pulsar 全面对比、场景匹配、选型决策树。
5
事件溯源与 CQRS
消息队列与事件流 深入事件溯源与 CQRS——Event Sourcing 原理、CQRS 读写分离、快照机制、投影重建。






