mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1578 字
4 分钟
Kafka 可靠性与 exactly-once
2026-03-29

Broker 3 宕机了——它上面有 5 个分区的 Leader 副本。这 5 个分区中,有 3 个的 ISR(同步副本集合)还剩 2 个,可以选出新 Leader 继续服务;另外 2 个的 ISR 只剩 1 个,如果 min.insync.replicas=2,写入直接被拒绝。更糟的情况:宕机的 Broker 刚收到生产者的消息但还没同步给 Follower——这些消息丢了。Kafka 的可靠性不是默认保证的,而是由 acksmin.insync.replicasunclean.leader.election.enable 这几个配置的组合决定的。

一、可靠性层次模型#

1.1 消息丢失的三种场景#

在分布式消息系统中,消息丢失可能发生在三个环节:

环节丢失原因Kafka 的应对
生产者 → Broker网络故障、ACK 未收到acks=all + 重试
Broker 内部Broker 宕机、磁盘故障副本机制 + ISR
Broker → 消费者消费者处理失败但提交了 Offset手动提交 + 幂等消费
graph LR subgraph "消息丢失风险点" P["生产者"] -->|1. 网络故障| B["Broker"] B -->|2. Broker 宕机| B2["副本"] B2 -->|3. 消费失败| C["消费者"] end subgraph "Kafka 可靠性机制" ACK["acks=all<br/>确保写入 ISR"] ISR["ISR + min.insync.replicas<br/>确保副本一致"] MANUAL["手动提交 Offset<br/>确保处理完成"] end P -.-> ACK B -.-> ISR C -.-> MANUAL

1.2 可靠性配置金字塔#

Kafka 的可靠性不是单一配置能保证的,而是多层配置的组合:

层级配置作用
生产者acks=all等待所有 ISR 副本确认
Brokermin.insync.replicas=2ISR 至少 2 个副本才允许写入
Brokerunclean.leader.election.enable=false禁止非 ISR 副本成为 Leader
Brokerreplication.factor=33 个副本保证 1 个可故障
消费者enable.auto.commit=false手动提交 Offset
# 生产环境可靠性配置清单
# Broker 端
default.replication.factor=3
min.insync.replicas=2
unclean.leader.election.enable=false
# Producer 端
acks=all
retries=2147483647
max.in.flight.requests.per.connection=5
enable.idempotence=true
# Consumer 端
enable.auto.commit=false
auto.offset.reset=earliest

二、ACK 策略深入#

2.1 三种 ACK 级别#

ACK 级别行为可靠性延迟适用场景
acks=0不等待任何确认最低最低日志收集、指标
acks=1等待 Leader 确认中等中等一般业务
acks=all等待所有 ISR 确认最高最高金融交易、订单
// ACK 策略配置与使用
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// acks=0:发完就忘
props.put(ProducerConfig.ACKS_CONFIG, "0");
// 适用于:日志收集,丢失少量日志可接受
// acks=1:Leader 确认
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 适用于:一般业务,Leader 写入即返回
// 风险:Leader 确认后宕机,Follower 未同步
// acks=all:全部 ISR 确认(推荐)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 适用于:金融交易、订单等不可丢失场景
// 注意:必须配合 min.insync.replicas 使用

2.2 acks=all 与 min.insync.replicas 的配合#

acks=all 中的”all”指的是 ISR 中的所有副本,而非所有副本。这意味着:

  • 如果 ISR 只有 Leader 一个副本,acks=all 等同于 acks=1
  • min.insync.replicas 限制了 ISR 的最小数量
graph TB subgraph "场景1:ISR=3(正常)" L1["Leader<br/>已写入"] F1_1["Follower 1<br/>已写入"] F1_2["Follower 2<br/>已写入"] L1 -->|acks=all| OK1["返回成功"] end subgraph "场景2:ISR=2(一个副本落后)" L2["Leader<br/>已写入"] F2_1["Follower 1<br/>已写入"] F2_2["Follower 2<br/>不在 ISR"] L2 -->|acks=all<br/>min.insync=2| OK2["返回成功"] end subgraph "场景3:ISR=1(危险)" L3["Leader<br/>已写入"] F3_1["Follower 1<br/>不在 ISR"] F3_2["Follower 2<br/>不在 ISR"] L3 -->|acks=all<br/>min.insync=2| FAIL["拒绝写入<br/>NotEnoughReplicas"] end
# min.insync.replicas 配置
# Broker 级别(默认值)
min.insync.replicas=1
# Topic 级别(推荐覆盖)
kafka-configs --alter --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders \
--add-config min.insync.replicas=2
# 验证配置
kafka-configs --describe --bootstrap-server localhost:9092 \
--entity-type topics --entity-name orders
Warning

min.insync.replicas 设置过大可能导致集群不可用。例如 3 副本 + min.insync.replicas=3,意味着所有 3 个副本都必须在线才能写入——任何一个 Broker 宕机都会导致写入失败。推荐配置:replication.factor=3 + min.insync.replicas=2

三、幂等生产者#

3.1 消息重复的原因#

在网络不稳定的环境中,生产者重试可能导致消息重复:

场景原因结果
生产者发送成功,ACK 丢失网络抖动生产者重试 → 消息重复
生产者发送成功,ACK 延迟网络延迟超时重试 → 消息重复
Broker 写入成功,响应前宕机Broker 故障生产者重试 → 消息重复

3.2 幂等生产者原理#

Kafka 0.11+ 引入幂等生产者,通过 Producer ID (PID) 和 Sequence Number 实现去重:

// 启用幂等生产者
Properties props = new Properties();
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// 等价于以下配置:
// acks=all
// retries=Integer.MAX_VALUE
// max.in.flight.requests.per.connection<=5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 幂等生产者自动为每条消息分配:
// 1. PID (Producer ID) - 生产者启动时由 Broker 分配
// 2. Sequence Number - 每个 <PID, Topic, Partition> 递增
// Broker 端检查:如果收到的 SeqNum <= 已提交的 SeqNum,则判定为重复
sequenceDiagram participant P as Producer (PID=1) participant B as Broker P->>B: InitProducerId<br/>(获取 PID) B->>P: PID=1, Epoch=0 P->>B: Produce(PID=1, Seq=0)<br/>消息 A B->>B: 写入 Seq=0 B->>P: ACK P->>B: Produce(PID=1, Seq=1)<br/>消息 B B->>B: 写入 Seq=1 Note over B: ACK 丢失! P->>B: Produce(PID=1, Seq=1)<br/>消息 B(重试) B->>B: Seq=1 已存在<br/>判定为重复,跳过 B->>P: ACK (不重复写入)

3.3 幂等的局限性#

维度幂等生产者事务生产者
去重范围单个 <PID, Topic, Partition>跨分区
跨分区不保证保证
跨会话不保证(PID 重新分配)保证(事务 ID)
消费端不保证保证(Read Committed)
性能开销中等
Note

幂等生产者只能保证单分区内的去重。如果生产者向多个分区发送消息,无法保证跨分区的 exactly-once。需要跨分区保证时,必须使用事务 API。

四、事务 API#

4.1 事务模型#

Kafka 事务支持”消费-处理-生产”的 exactly-once 语义:

graph LR subgraph "事务流程" C["消费 Topic A<br/>Offset 100-200"] --> P["处理"] P --> W["写入 Topic B<br/>+ 提交 Offset"] end subgraph "原子性保证" OK["事务提交<br/>Topic B 写入 + Offset 提交"] FAIL["事务回滚<br/>Topic B 不写入 + Offset 不提交"] end W --> OK W --> FAIL

4.2 事务 API 使用#

// 事务生产者配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-tx-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
props.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务
producer.initTransactions();
try {
// 开始事务
producer.beginTransaction();
// 从 Topic A 消费并处理
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// 处理消息
String result = processOrder(record.value());
// 写入 Topic B
producer.send(new ProducerRecord<>("orders-processed", record.key(), result));
}
// 提交消费 Offset(作为事务的一部分)
producer.sendOffsetsToTransaction(
getOffsetsToCommit(records),
consumer.groupMetadata()
);
// 提交事务(原子性:写入 + Offset 提交)
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 另一个具有相同 transactional.id 的生产者已启动
producer.close();
} catch (KafkaException e) {
// 事务失败,回滚
producer.abortTransaction();
}

4.3 事务隔离级别#

消费者端需要配置隔离级别来控制是否读取事务消息:

隔离级别行为适用场景
read_uncommitted读取所有消息(包括未提交事务)默认,性能优先
read_committed只读取已提交事务的消息需要 exactly-once
// 消费者配置
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// read_committed 消费者不会返回未提交事务的消息
// 它会等待事务提交或超时后才返回消息
// 事务超时配置
// Producer 端
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000"); // 60 秒
// Broker 端
// transaction.max.timeout.ms=900000 // 最大 15 分钟
// transactional.id.expiration.ms=604800000 // 7 天过期

4.4 事务协调器#

sequenceDiagram participant P as Producer participant TC as TransactionCoordinator participant L as Topic Partition Leader participant CT as __transaction_state P->>TC: InitProducerId(transactionalId) TC->>CT: 写入事务日志<br/>(PID, Epoch) TC->>P: PID + Epoch P->>TC: BeginTransaction TC->>CT: 写入状态=ONGOING P->>L: AddPartitionsToTxn(partitions) L->>TC: 确认分区加入事务 P->>L: Produce(records with PID, Seq) L->>L: 写入消息(标记为事务消息) P->>TC: CommitTransaction TC->>CT: 写入状态=PREPARE_COMMIT TC->>L: 写入 COMMIT 标记到每个分区 TC->>CT: 写入状态=COMPLETE_COMMIT TC->>P: 事务提交成功

五、消费者可靠性#

5.1 Offset 提交策略#

策略实现方式语义保证风险
自动提交enable.auto.commit=trueat-most-once消息丢失
同步手动提交commitSync()at-least-once重复消费
异步手动提交commitAsync()at-least-once重复消费 + 丢失
组合提交commitAsync() + commitSync()at-least-once推荐
// 组合提交模式(推荐)
try {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// 正常情况异步提交(高性能)
consumer.commitAsync();
}
} finally {
try {
// 关闭前同步提交(确保最后一次提交成功)
consumer.commitSync();
} finally {
consumer.close();
}
}
// 精确 Offset 提交(处理完每条消息后)
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)
);
} catch (Exception e) {
// 处理失败,不提交该消息的 Offset
log.error("处理消息失败: offset={}", record.offset(), e);
}
}
if (!offsets.isEmpty()) {
consumer.commitSync(offsets);
}

5.2 消费者重试与死信队列#

// 消费者重试 + 死信队列模式
public class RetryConsumer {
private final KafkaProducer<String, String> dlqProducer;
private final int maxRetries;
public void consume() {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
int retryCount = getRetryCount(record);
if (retryCount >= maxRetries) {
// 超过重试次数,发送到死信队列
dlqProducer.send(new ProducerRecord<>(
"orders-dlq",
record.key(),
enrichWithRetryInfo(record, retryCount)
));
} else {
try {
processRecord(record);
} catch (Exception e) {
// 重试:发送到重试 Topic
dlqProducer.send(new ProducerRecord<>(
"orders-retry",
record.key(),
enrichWithRetryInfo(record, retryCount + 1)
));
}
}
}
consumer.commitSync();
}
}
}

六、端到端 Exactly-Once#

6.1 Exactly-Once 的挑战#

在分布式系统中,实现端到端 exactly-once 极其困难:

挑战原因解决方案
生产者重复网络重试幂等生产者
消费者重复Offset 提交与处理不同步事务 API
跨系统一致性Kafka 与数据库无法原子操作事务性发件箱
消费者重启状态丢失状态存储 + 检查点

6.2 Kafka Streams 的 exactly-once#

// Kafka Streams exactly-once 配置
Properties props = new Properties();
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// Kafka 3.0+ 使用 EXACTLY_ONCE_V2(更高效的事务协议)
// Kafka Streams 内部使用事务 API 保证 exactly-once
// 1. 消费输入 Topic
// 2. 处理记录
// 3. 写入输出 Topic + 更新状态存储 + 提交 Offset
// 以上三步在同一个事务中完成
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> orders = builder.stream("orders");
orders.groupByKey()
.aggregate(
() -> 0.0, // 初始值
(key, value, aggregate) -> aggregate + parseAmount(value),
Materialized.with(Serdes.String(), Serdes.Double())
)
.toStream()
.to("order-totals");

七、可靠性监控#

7.1 关键监控指标#

指标含义告警条件
record-error-rate生产者发送错误率> 0
record-retry-rate生产者重试率> 0 持续增长
record-send-rate发送速率突然下降
commit-latency-avgOffset 提交延迟> 1s
records-lag-max消费者最大落后量> 10000
UnderReplicatedPartitionsISR 不足的分区数> 0
# 消费者 Lag 监控
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-service
# 生产者指标(JMX)
# kafka.producer:type=producer-metrics,client-id=*
# kafka.producer:type=producer-topic-metrics,client-id=*,topic=*
# Broker 可靠性指标
# kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
# kafka.server:type=ReplicaManager,name=OfflinePartitionsCount
# kafka.server:type=TransactionCoordinator,name=*

7.2 可靠性测试#

# 模拟 Broker 宕机
# 1. 启动 3 Broker 集群
# 2. 启动生产者持续发送消息
# 3. 杀掉一个 Broker
kill -9 <broker-pid>
# 4. 验证消息不丢失
kafka-console-consumer --bootstrap-server localhost:9092 \
--topic orders --from-beginning \
--property print.offset=true
# 模拟网络分区
iptables -A INPUT -s <broker-ip> -j DROP
iptables -D INPUT -s <broker-ip> -j DROP
# 性能测试(可靠性配置下)
kafka-producer-perf-test --topic orders \
--num-records 1000000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=localhost:9092 \
acks=all \
enable.idempotence=true
Tip

在生产环境中,可靠性测试应该包含:Broker 宕机恢复、网络分区、磁盘故障、Controller 切换等场景。使用 Chaos Engineering 方法(如 Chaos Monkey)定期验证系统的容错能力。

八、可靠性配置速查表#

场景acksmin.insync.replicas幂等事务适用
日志收集01可丢可重
用户行为追踪11可重不可丢
订单处理all2不可丢不可重
支付交易all2不可丢不可重

九、总结#

上一章探讨了Kafka 存储与零拷贝。

维度关键要点
ACK 策略acks=all + min.insync.replicas=2 是可靠性的基础配置
幂等生产者解决单分区内消息重复,启用 enable.idempotence=true
事务 API解决跨分区 exactly-once,支持”消费-处理-生产”原子操作
消费者可靠性手动提交 Offset + 重试 + 死信队列,保证 at-least-once
端到端Kafka Streams 提供 exactly-once-v2,是最简单的端到端方案
监控record-error-rate、records-lag-max、UnderReplicatedPartitions 是关键指标
Warning

exactly-once 不是银弹。事务 API 有性能开销(约 20-30% 吞吐下降),且增加了系统复杂度。在选择可靠性级别时,要权衡业务需求与性能成本——大多数场景下 at-least-once + 幂等消费已经足够。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Kafka 可靠性与 exactly-once
https://blog.souloss.com/posts/messaging/kafka-reliability/
作者
Souloss
发布于
2026-03-29
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时