Broker 3 宕机了——它上面有 5 个分区的 Leader 副本。这 5 个分区中,有 3 个的 ISR(同步副本集合)还剩 2 个,可以选出新 Leader 继续服务;另外 2 个的 ISR 只剩 1 个,如果 min.insync.replicas=2,写入直接被拒绝。更糟的情况:宕机的 Broker 刚收到生产者的消息但还没同步给 Follower——这些消息丢了。Kafka 的可靠性不是默认保证的,而是由 acks、min.insync.replicas、unclean.leader.election.enable 这几个配置的组合决定的。
一、可靠性层次模型
1.1 消息丢失的三种场景
在分布式消息系统中,消息丢失可能发生在三个环节:
| 环节 | 丢失原因 | Kafka 的应对 |
|---|---|---|
| 生产者 → Broker | 网络故障、ACK 未收到 | acks=all + 重试 |
| Broker 内部 | Broker 宕机、磁盘故障 | 副本机制 + ISR |
| Broker → 消费者 | 消费者处理失败但提交了 Offset | 手动提交 + 幂等消费 |
1.2 可靠性配置金字塔
Kafka 的可靠性不是单一配置能保证的,而是多层配置的组合:
| 层级 | 配置 | 作用 |
|---|---|---|
| 生产者 | acks=all | 等待所有 ISR 副本确认 |
| Broker | min.insync.replicas=2 | ISR 至少 2 个副本才允许写入 |
| Broker | unclean.leader.election.enable=false | 禁止非 ISR 副本成为 Leader |
| Broker | replication.factor=3 | 3 个副本保证 1 个可故障 |
| 消费者 | enable.auto.commit=false | 手动提交 Offset |
# 生产环境可靠性配置清单# Broker 端default.replication.factor=3min.insync.replicas=2unclean.leader.election.enable=false
# Producer 端acks=allretries=2147483647max.in.flight.requests.per.connection=5enable.idempotence=true
# Consumer 端enable.auto.commit=falseauto.offset.reset=earliest二、ACK 策略深入
2.1 三种 ACK 级别
| ACK 级别 | 行为 | 可靠性 | 延迟 | 适用场景 |
|---|---|---|---|---|
acks=0 | 不等待任何确认 | 最低 | 最低 | 日志收集、指标 |
acks=1 | 等待 Leader 确认 | 中等 | 中等 | 一般业务 |
acks=all | 等待所有 ISR 确认 | 最高 | 最高 | 金融交易、订单 |
// ACK 策略配置与使用Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// acks=0:发完就忘props.put(ProducerConfig.ACKS_CONFIG, "0");// 适用于:日志收集,丢失少量日志可接受
// acks=1:Leader 确认props.put(ProducerConfig.ACKS_CONFIG, "1");// 适用于:一般业务,Leader 写入即返回// 风险:Leader 确认后宕机,Follower 未同步
// acks=all:全部 ISR 确认(推荐)props.put(ProducerConfig.ACKS_CONFIG, "all");// 适用于:金融交易、订单等不可丢失场景// 注意:必须配合 min.insync.replicas 使用2.2 acks=all 与 min.insync.replicas 的配合
acks=all 中的”all”指的是 ISR 中的所有副本,而非所有副本。这意味着:
- 如果 ISR 只有 Leader 一个副本,
acks=all等同于acks=1 min.insync.replicas限制了 ISR 的最小数量
# min.insync.replicas 配置# Broker 级别(默认值)min.insync.replicas=1
# Topic 级别(推荐覆盖)kafka-configs --alter --bootstrap-server localhost:9092 \ --entity-type topics --entity-name orders \ --add-config min.insync.replicas=2
# 验证配置kafka-configs --describe --bootstrap-server localhost:9092 \ --entity-type topics --entity-name ordersmin.insync.replicas 设置过大可能导致集群不可用。例如 3 副本 + min.insync.replicas=3,意味着所有 3 个副本都必须在线才能写入——任何一个 Broker 宕机都会导致写入失败。推荐配置:replication.factor=3 + min.insync.replicas=2。
三、幂等生产者
3.1 消息重复的原因
在网络不稳定的环境中,生产者重试可能导致消息重复:
| 场景 | 原因 | 结果 |
|---|---|---|
| 生产者发送成功,ACK 丢失 | 网络抖动 | 生产者重试 → 消息重复 |
| 生产者发送成功,ACK 延迟 | 网络延迟 | 超时重试 → 消息重复 |
| Broker 写入成功,响应前宕机 | Broker 故障 | 生产者重试 → 消息重复 |
3.2 幂等生产者原理
Kafka 0.11+ 引入幂等生产者,通过 Producer ID (PID) 和 Sequence Number 实现去重:
// 启用幂等生产者Properties props = new Properties();props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");// 等价于以下配置:// acks=all// retries=Integer.MAX_VALUE// max.in.flight.requests.per.connection<=5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 幂等生产者自动为每条消息分配:// 1. PID (Producer ID) - 生产者启动时由 Broker 分配// 2. Sequence Number - 每个 <PID, Topic, Partition> 递增// Broker 端检查:如果收到的 SeqNum <= 已提交的 SeqNum,则判定为重复3.3 幂等的局限性
| 维度 | 幂等生产者 | 事务生产者 |
|---|---|---|
| 去重范围 | 单个 <PID, Topic, Partition> | 跨分区 |
| 跨分区 | 不保证 | 保证 |
| 跨会话 | 不保证(PID 重新分配) | 保证(事务 ID) |
| 消费端 | 不保证 | 保证(Read Committed) |
| 性能开销 | 低 | 中等 |
幂等生产者只能保证单分区内的去重。如果生产者向多个分区发送消息,无法保证跨分区的 exactly-once。需要跨分区保证时,必须使用事务 API。
四、事务 API
4.1 事务模型
Kafka 事务支持”消费-处理-生产”的 exactly-once 语义:
4.2 事务 API 使用
// 事务生产者配置Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-1");props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");props.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务producer.initTransactions();
try { // 开始事务 producer.beginTransaction();
// 从 Topic A 消费并处理 ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { // 处理消息 String result = processOrder(record.value()); // 写入 Topic B producer.send(new ProducerRecord<>("orders-processed", record.key(), result)); }
// 提交消费 Offset(作为事务的一部分) producer.sendOffsetsToTransaction( getOffsetsToCommit(records), consumer.groupMetadata() );
// 提交事务(原子性:写入 + Offset 提交) producer.commitTransaction();} catch (ProducerFencedException e) { // 另一个具有相同 transactional.id 的生产者已启动 producer.close();} catch (KafkaException e) { // 事务失败,回滚 producer.abortTransaction();}4.3 事务隔离级别
消费者端需要配置隔离级别来控制是否读取事务消息:
| 隔离级别 | 行为 | 适用场景 |
|---|---|---|
read_uncommitted | 读取所有消息(包括未提交事务) | 默认,性能优先 |
read_committed | 只读取已提交事务的消息 | 需要 exactly-once |
// 消费者配置Properties consumerProps = new Properties();consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");// read_committed 消费者不会返回未提交事务的消息// 它会等待事务提交或超时后才返回消息
// 事务超时配置// Producer 端props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000"); // 60 秒
// Broker 端// transaction.max.timeout.ms=900000 // 最大 15 分钟// transactional.id.expiration.ms=604800000 // 7 天过期4.4 事务协调器
五、消费者可靠性
5.1 Offset 提交策略
| 策略 | 实现方式 | 语义保证 | 风险 |
|---|---|---|---|
| 自动提交 | enable.auto.commit=true | at-most-once | 消息丢失 |
| 同步手动提交 | commitSync() | at-least-once | 重复消费 |
| 异步手动提交 | commitAsync() | at-least-once | 重复消费 + 丢失 |
| 组合提交 | commitAsync() + commitSync() | at-least-once | 推荐 |
// 组合提交模式(推荐)try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { processRecord(record); }
// 正常情况异步提交(高性能) consumer.commitAsync(); }} finally { try { // 关闭前同步提交(确保最后一次提交成功) consumer.commitSync(); } finally { consumer.close(); }}
// 精确 Offset 提交(处理完每条消息后)Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();for (ConsumerRecord<String, String> record : records) { try { processRecord(record); offsets.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1) ); } catch (Exception e) { // 处理失败,不提交该消息的 Offset log.error("处理消息失败: offset={}", record.offset(), e); }}if (!offsets.isEmpty()) { consumer.commitSync(offsets);}5.2 消费者重试与死信队列
// 消费者重试 + 死信队列模式public class RetryConsumer { private final KafkaProducer<String, String> dlqProducer; private final int maxRetries;
public void consume() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { int retryCount = getRetryCount(record);
if (retryCount >= maxRetries) { // 超过重试次数,发送到死信队列 dlqProducer.send(new ProducerRecord<>( "orders-dlq", record.key(), enrichWithRetryInfo(record, retryCount) )); } else { try { processRecord(record); } catch (Exception e) { // 重试:发送到重试 Topic dlqProducer.send(new ProducerRecord<>( "orders-retry", record.key(), enrichWithRetryInfo(record, retryCount + 1) )); } } } consumer.commitSync(); } }}六、端到端 Exactly-Once
6.1 Exactly-Once 的挑战
在分布式系统中,实现端到端 exactly-once 极其困难:
| 挑战 | 原因 | 解决方案 |
|---|---|---|
| 生产者重复 | 网络重试 | 幂等生产者 |
| 消费者重复 | Offset 提交与处理不同步 | 事务 API |
| 跨系统一致性 | Kafka 与数据库无法原子操作 | 事务性发件箱 |
| 消费者重启 | 状态丢失 | 状态存储 + 检查点 |
6.2 Kafka Streams 的 exactly-once
// Kafka Streams exactly-once 配置Properties props = new Properties();props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);// Kafka 3.0+ 使用 EXACTLY_ONCE_V2(更高效的事务协议)
// Kafka Streams 内部使用事务 API 保证 exactly-once// 1. 消费输入 Topic// 2. 处理记录// 3. 写入输出 Topic + 更新状态存储 + 提交 Offset// 以上三步在同一个事务中完成
StreamsBuilder builder = new StreamsBuilder();KStream<String, String> orders = builder.stream("orders");
orders.groupByKey() .aggregate( () -> 0.0, // 初始值 (key, value, aggregate) -> aggregate + parseAmount(value), Materialized.with(Serdes.String(), Serdes.Double()) ) .toStream() .to("order-totals");七、可靠性监控
7.1 关键监控指标
| 指标 | 含义 | 告警条件 |
|---|---|---|
record-error-rate | 生产者发送错误率 | > 0 |
record-retry-rate | 生产者重试率 | > 0 持续增长 |
record-send-rate | 发送速率 | 突然下降 |
commit-latency-avg | Offset 提交延迟 | > 1s |
records-lag-max | 消费者最大落后量 | > 10000 |
UnderReplicatedPartitions | ISR 不足的分区数 | > 0 |
# 消费者 Lag 监控kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group order-service
# 生产者指标(JMX)# kafka.producer:type=producer-metrics,client-id=*# kafka.producer:type=producer-topic-metrics,client-id=*,topic=*
# Broker 可靠性指标# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions# kafka.server:type=ReplicaManager,name=OfflinePartitionsCount# kafka.server:type=TransactionCoordinator,name=*7.2 可靠性测试
# 模拟 Broker 宕机# 1. 启动 3 Broker 集群# 2. 启动生产者持续发送消息# 3. 杀掉一个 Brokerkill -9 <broker-pid>
# 4. 验证消息不丢失kafka-console-consumer --bootstrap-server localhost:9092 \ --topic orders --from-beginning \ --property print.offset=true
# 模拟网络分区iptables -A INPUT -s <broker-ip> -j DROPiptables -D INPUT -s <broker-ip> -j DROP
# 性能测试(可靠性配置下)kafka-producer-perf-test --topic orders \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer-props bootstrap.servers=localhost:9092 \ acks=all \ enable.idempotence=true在生产环境中,可靠性测试应该包含:Broker 宕机恢复、网络分区、磁盘故障、Controller 切换等场景。使用 Chaos Engineering 方法(如 Chaos Monkey)定期验证系统的容错能力。
八、可靠性配置速查表
| 场景 | acks | min.insync.replicas | 幂等 | 事务 | 适用 |
|---|---|---|---|---|---|
| 日志收集 | 0 | 1 | 否 | 否 | 可丢可重 |
| 用户行为追踪 | 1 | 1 | 是 | 否 | 可重不可丢 |
| 订单处理 | all | 2 | 是 | 是 | 不可丢不可重 |
| 支付交易 | all | 2 | 是 | 是 | 不可丢不可重 |
九、总结
上一章探讨了Kafka 存储与零拷贝。
| 维度 | 关键要点 |
|---|---|
| ACK 策略 | acks=all + min.insync.replicas=2 是可靠性的基础配置 |
| 幂等生产者 | 解决单分区内消息重复,启用 enable.idempotence=true |
| 事务 API | 解决跨分区 exactly-once,支持”消费-处理-生产”原子操作 |
| 消费者可靠性 | 手动提交 Offset + 重试 + 死信队列,保证 at-least-once |
| 端到端 | Kafka Streams 提供 exactly-once-v2,是最简单的端到端方案 |
| 监控 | record-error-rate、records-lag-max、UnderReplicatedPartitions 是关键指标 |
exactly-once 不是银弹。事务 API 有性能开销(约 20-30% 吞吐下降),且增加了系统复杂度。在选择可靠性级别时,要权衡业务需求与性能成本——大多数场景下 at-least-once + 幂等消费已经足够。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






