mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1844 字
5 分钟
消息积压与反压
2026-04-22

凌晨 3 点告警响起——Kafka 消费者 Lag 从 0 飙到 5000 万。生产者还在以每秒 5 万条的速度写入,消费者每秒只能处理 1 万条。差距每秒扩大 4 万条,磁盘空间还能撑 6 小时。这不是假设——这是大促后最常见的线上事故。消息积压的本质是生产速度持续大于消费速度,而解决积压的关键不是”加快消费”这么简单:是加消费者还是优化消费逻辑?是扩分区还是临时丢弃非关键消息?是反压生产者还是接受延迟?每个选择都有代价。

一、消息积压的根因分析#

1.1 积压的本质#

消息积压的本质是生产速度 > 消费速度的持续状态。短期的速度不匹配是正常的,但长期不匹配会导致积压:

graph LR subgraph "正常状态" P1["Producer<br/>1000 msg/s"] --> Q1["Queue<br/>Lag ≈ 0"] Q1 --> C1["Consumer<br/>1000 msg/s"] end subgraph "积压状态" P2["Producer<br/>10000 msg/s"] --> Q2["Queue<br/>Lag ↑↑↑"] Q2 --> C2["Consumer<br/>1000 msg/s"] end
积压阶段Lag 范围影响紧急度
轻微分钟级消费延迟增加
中度小时级业务感知延迟
严重天级数据过期、磁盘告警
灾难超出保留期消息丢失紧急

1.2 积压的五大根因#

根因说明典型场景
消费端故障Consumer 宕机或 OOM大促时消费者崩溃
消费逻辑慢DB 慢查询、外部 API 超时下游数据库性能下降
生产突增流量洪峰超出容量双十一秒杀
分区不均衡某些分区消息过多Key 分布不均匀
Rebalance 风暴频繁 Rebalance 导致消费暂停消费者配置不当
graph TB LAG["消息积压"] --> C1["消费端故障"] LAG --> C2["消费逻辑慢"] LAG --> C3["生产突增"] LAG --> C4["分区不均衡"] LAG --> C5["Rebalance 风暴"] C2 --> C2A["DB 慢查询"] C2 --> C2B["外部 API 超时"] C2 --> C2C["GC 停顿"] C3 --> C3A["大促流量"] C3 --> C3B["批量导入"] C3 --> C3C["重放历史数据"] C5 --> C5A["session.timeout 过短"] C5 --> C5B["消费者频繁上下线"] C5 --> C5C["心跳延迟"]

1.3 积压监控指标#

指标KafkaRabbitMQRocketMQ告警阈值
消费延迟Consumer LagQueue DepthConsumer Offset Diff> 10万条
消费速率bytes-consumed-ratemessage_rates.delivertps下降 50%
生产速率bytes-produced-ratemessage_rates.publishtps突增 3x
消费者状态Consumer StateConsumer CountConsumer Status消费者掉线
磁盘使用Disk UsageDisk FreeDisk Usage> 80%
# Kafka:查看 Consumer Lag
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-consumer-group
# 输出:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 12345 56789 44444
# orders 1 23456 67890 44434
# RabbitMQ:查看队列深度
rabbitmqctl list_queues name messages consumers message_rates
# RocketMQ:查看消费进度
./mqadmin consumerProgress -g order-consumer-group
Note

监控 Lag 时,不要只看绝对值,更要看趋势。Lag 持续增长说明消费速度跟不上生产速度;Lag 稳定说明虽然延迟但能追上;Lag 下降说明正在恢复。

二、消费速度优化#

2.1 增加消费者#

最直接的方案是增加消费者实例:

graph TB subgraph "优化前:2 消费者" Q1["Topic (4 partitions)"] --> C1["Consumer 1<br/>2 partitions"] Q1 --> C2["Consumer 2<br/>2 partitions"] Note1["总消费速度: 2000 msg/s"] end subgraph "优化后:4 消费者" Q2["Topic (4 partitions)"] --> C3["Consumer 1<br/>1 partition"] Q2 --> C4["Consumer 2<br/>1 partition"] Q2 --> C5["Consumer 3<br/>1 partition"] Q2 --> C6["Consumer 4<br/>1 partition"] Note2["总消费速度: 4000 msg/s"] end
系统限制说明解决方案
Kafka消费者数 ≤ 分区数增加分区数
RabbitMQ消费者数无限制直接增加消费者
RocketMQ消费者数 ≤ 队列数增加队列数
# Kafka:增加分区(注意:会影响 Key 路由)
kafka-topics --bootstrap-server localhost:9092 \
--alter --topic orders --partitions 16
# RocketMQ:增加队列
./mqadmin updateTopic -n localhost:9876 -t TopicOrder -c DefaultCluster -r 16

2.2 提升单消费者吞吐#

优化项Kafka 配置效果
批量消费max.poll.records=500减少网络往返
预取fetch.min.bytes=1MB减少请求次数
并发处理多线程处理拉取到的记录提高处理并行度
异步提交enable.auto.commit=true减少 commit 开销
// Kafka:多线程消费优化
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
ExecutorService executor = Executors.newFixedThreadPool(8);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 按 Partition 分组,每个 Partition 内保持有序
Map<TopicPartition, List<ConsumerRecord<String, String>>> partitioned =
records.partitions().stream()
.collect(Collectors.toMap(
tp -> tp,
tp -> records.records(tp)
));
List<Future<?>> futures = new ArrayList<>();
for (var entry : partitioned.entrySet()) {
futures.add(executor.submit(() -> {
for (var record : entry.getValue()) {
processRecord(record);
}
}));
}
// 等待所有线程完成
for (Future<?> f : futures) f.get();
consumer.commitSync();
}

2.3 跳过非关键消息#

在极端积压场景下,可以跳过部分非关键消息:

// 策略一:只消费最新消息(跳过积压)
props.put("auto.offset.reset", "latest"); // 从最新开始消费
// 策略二:重置 Offset 到最近位置
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-consumer-group \
--reset-offsets --to-latest --execute --topic orders
// 策略三:按时间重置(跳过超过保留期的消息)
kafka-consumer-groups --bootstrap-server localhost:9092 \
--group order-consumer-group \
--reset-offsets --to-datetime 2026-10-12T00:00:00 --execute --topic orders
策略数据损失恢复速度适用场景
从最新消费丢失所有积压消息即时日志、监控数据
按时间重置丢失指定时间前的消息可容忍部分丢失
增加消费者首选方案
临时消费者需要额外资源

三、反压机制#

3.1 什么是反压?#

反压(Backpressure)是当消费者处理不过来时,向生产者传递”减速”信号,防止系统过载:

sequenceDiagram participant P as Producer participant Q as Queue participant C as Consumer Note over P,C: 正常状态 P->>Q: 1000 msg/s Q->>C: 1000 msg/s Note over C: 消费变慢! C->>Q: 只能处理 500 msg/s Q->>Q: 积压增加 Note over Q,C: 反压触发 Q->>P: 反压信号(减速) P->>Q: 500 msg/s(降速) Q->>C: 500 msg/s Note over Q: 积压不再增长

3.2 各系统的反压实现#

系统反压机制触发条件效果
Kafka无原生反压Consumer 自行控制拉取速率
RabbitMQCredit-based 流控内存/磁盘阈值阻塞 Connection
RocketMQ拉取式消费Consumer 处理能力自动调节拉取频率
PulsarConsumer 流控Permit 机制精确控制投递速率
// Kafka:Consumer 端流控
// 通过控制 poll 频率和 max.poll.records 实现反压
props.put("max.poll.records", 100); // 每次 poll 最多 100 条
props.put("max.poll.interval.ms", 300000); // 5 分钟内必须处理完
// 如果处理慢,自动降低 poll 频率
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.count() > 0) {
processRecords(records);
// 处理慢 → 下次 poll 延迟
}
}
// RabbitMQ:Consumer 端流控
channel.basicQos(100); // 预取 100 条
channel.basicConsume("order-queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String tag, Envelope envelope,
AMQP.BasicProperties props, byte[] body) {
processMessage(body);
// 手动确认后,RabbitMQ 才会投递新消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
// 如果处理慢,不 ACK → RabbitMQ 不再投递新消息 → 自动反压
// Pulsar:Consumer 端流控
Consumer<byte[]> consumer = client.newConsumer()
.topic("orders")
.subscriptionName("order-processor")
.receiverQueueSize(100) // 接收队列大小
.subscribe();
// Pulsar 使用 permit 机制:Consumer 消费一条消息后发放一个 permit
// Broker 只在有 permit 时才投递消息

3.3 Producer 端反压#

当消息系统本身无法承受写入速度时,需要对 Producer 施加反压:

// Kafka Producer:缓冲区满时阻塞
props.put("buffer.memory", 67108864); // 64MB 缓冲区
props.put("max.block.ms", 60000); // 缓冲区满时最多阻塞 60 秒
try {
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Send failed", exception);
}
});
} catch (BufferExhaustedException e) {
// 缓冲区满,需要降速或降级
log.warn("Producer buffer full, applying backpressure");
}
策略说明数据损失适用场景
阻塞等待缓冲区满时阻塞可容忍延迟
降速降低发送速率流量控制
降级丢弃低优先级消息部分有优先级区分
转储写入本地文件/DB临时缓冲

四、死信队列#

4.1 死信的产生与处理#

当消息消费失败超过重试次数后,进入死信队列:

graph LR Q["业务队列"] -->|"消费失败"| RQ["重试队列<br/>(延迟递增)"] RQ -->|"重试成功"| Q RQ -->|"超过重试次数"| DLQ["死信队列<br/>(DLQ)"] DLQ --> HANDLER["死信处理器"] HANDLER -->|"人工修复"| Q HANDLER -->|"告警"| ALERT["告警系统"] HANDLER -->|"归档"| ARCHIVE["归档存储"]
// Kafka:手动实现死信队列
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
int retryCount = getRetryCount(record);
try {
processRecord(record);
} catch (Exception e) {
if (retryCount >= MAX_RETRIES) {
// 发送到死信 Topic
ProducerRecord<String, String> dlqRecord = new ProducerRecord<>(
"orders.DLQ",
record.key(),
record.value()
);
dlqProducer.send(dlqRecord);
log.error("Message sent to DLQ after {} retries: {}", retryCount, record.value());
} else {
// 发送到重试 Topic
sendToRetryTopic(record, retryCount + 1);
}
}
}
consumer.commitSync();
}

4.2 各系统的死信实现#

系统死信机制配置
Kafka无内置,需手动实现自定义 DLQ Topic
RabbitMQ原生死信 Exchangex-dead-letter-exchange
RocketMQ内置死信队列%DLQ%ConsumerGroup
Pulsar重试 Letter TopicretryLetterTopic
# RabbitMQ:查看死信队列
rabbitmqctl list_queues name messages arguments | grep dlx
# RocketMQ:查看死信队列
./mqadmin examineTopicStats -t %DLQ%order-consumer-group
# Kafka:消费死信 Topic
kafka-console-consumer --bootstrap-server localhost:9092 --topic orders.DLQ
Warning

死信队列不是”垃圾桶”——每条死信消息都代表一个业务异常。务必建立死信监控和告警机制,定期分析死信原因并修复。如果死信率持续上升,说明系统存在根本性问题。

五、降级策略#

5.1 多级降级方案#

当积压无法快速消除时,需要降级以保证核心业务:

graph TD LAG["消息积压"] --> LEVEL1["Level 1: 扩容<br/>增加消费者"] LEVEL1 -->|"不够"| LEVEL2["Level 2: 优化<br/>提升消费速度"] LEVEL2 -->|"不够"| LEVEL3["Level 3: 降级<br/>跳过非关键消息"] LEVEL3 -->|"不够"| LEVEL4["Level 4: 转储<br/>写入备用存储"] LEVEL4 -->|"不够"| LEVEL5["Level 5: 限流<br/>限制生产速率"]
降级级别策略影响恢复难度
Level 1增加消费者实例低(缩容即可)
Level 2优化消费逻辑
Level 3跳过非关键消息部分数据丢失
Level 4转储到备用存储延迟处理高(需回填)
Level 5限制生产速率业务受限

5.2 临时消费者模式#

// 临时消费者:快速消费积压消息,不做业务处理,只转储
public class EmergencyConsumer {
private final KafkaConsumer<String, String> consumer;
private final Producer<File> fileProducer; // 写入本地文件
public void startEmergencyConsume() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (var record : records) {
// 转储到文件,后续回填
fileProducer.write(record.key(), record.value(), record.timestamp());
}
consumer.commitSync();
if (getLag() < THRESHOLD) {
log.info("Lag recovered, stopping emergency consumer");
break;
}
}
}
}

5.3 优先级消费#

// 按消息优先级消费
public class PriorityConsumer {
public void consumeWithPriority() {
// 先消费高优先级 Topic
consumeTopic("orders-high-priority");
// 再消费普通 Topic
consumeTopic("orders-normal");
// 最后消费低优先级 Topic
consumeTopic("orders-low-priority");
}
private void consumeTopic(String topic) {
// 消费到 Lag < 阈值后切换
while (getLag(topic) > LAG_THRESHOLD) {
var records = consumer.poll(Duration.ofMillis(100));
processRecords(records);
}
}
}

六、预防积压的架构设计#

6.1 容量规划#

指标计算公式示例
峰值生产速率日均 × 峰值倍数1000 × 10 = 10000 msg/s
所需消费者数峰值速率 / 单消费者速率10000 / 2000 = 5
安全余量所需消费者 × 1.55 × 1.5 = 8
磁盘容量峰值速率 × 保留时间 × 消息大小10000 × 86400 × 1KB = 864GB/天

6.2 自动伸缩#

# Kubernetes HPA:基于 Consumer Lag 自动伸缩
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: order-consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: order-consumer
minReplicas: 2
maxReplicas: 20
metrics:
- type: External
external:
metric:
name: kafka_consumer_lag
target:
type: AverageValue
averageValue: "10000"

6.3 熔断与限流#

// Producer 端限流
public class RateLimitedProducer {
private final RateLimiter rateLimiter = RateLimiter.create(5000); // 5000 msg/s
public void send(String topic, String key, byte[] value) {
if (!rateLimiter.tryAcquire(100, TimeUnit.MILLISECONDS)) {
// 限流:丢弃或缓冲
log.warn("Rate limited, message buffered");
bufferMessage(topic, key, value);
return;
}
producer.send(new ProducerRecord<>(topic, key, value));
}
}

七、各系统积压处理对比#

维度KafkaRabbitMQRocketMQPulsar
积压指标Consumer LagQueue DepthOffset DiffBacklog Size
增加消费者≤ 分区数无限制≤ 队列数无限制
反压机制Consumer 自控Credit-based拉取式Permit
死信队列手动实现原生支持内置重试 Letter
消息跳过Offset 重置丢弃策略跳过非活跃Subscription reset
分区扩容支持(影响路由)N/A支持支持
Tip

消息积压的处理原则:1)监控先行——Lag 告警是第一道防线;2)快速扩容——消费者实例的弹性伸缩是首选方案;3)降级有序——从扩容到优化到跳过,逐步升级;4)根因分析——积压是症状,找到根因才能根治。

八、总结#

上一章从全景视角介绍了消息有序性。

维度关键要点
根因分析消费端故障、逻辑慢、生产突增、分区不均、Rebalance 风暴
消费优化增加消费者、提升单消费者吞吐、跳过非关键消息
反压机制Consumer 端流控(QoS/permit)、Producer 端限流
死信队列重试失败后进入 DLQ,必须监控和告警
降级策略扩容 → 优化 → 跳过 → 转储 → 限流
预防设计容量规划、自动伸缩、熔断限流

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

消息积压与反压
https://blog.souloss.com/posts/messaging/message-backlog-and-backpressure/
作者
Souloss
发布于
2026-04-22
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时