分布式系统有一个反直觉的事实:精确一次(exactly-once)投递在理论上不可能实现。网络会断、进程会崩、时钟会漂——任何一次重试都可能产生重复消息,而避免重复的唯一方式是不重试,代价是丢消息。但业务系统依然要求”不丢不重”,这并非自相矛盾——exactly-once 语义不是靠网络保证的,而是靠幂等性和事务在应用层实现的。理解 at-most-once、at-least-once、exactly-once 三种语义的实现代价,是设计可靠消息系统的第一步。
一、三种消息语义
1.1 At-Most-Once(最多一次)
消息可能丢失,但不会重复。这是最简单的语义——发送后不管,消费者处理完不确认也无所谓。 At-Most-Once 的实现方式
At-Most-Once 本质上就是”发后不管”——生产者不等待确认,消费者自动确认。来看各系统的实现:
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "0"); // 不等确认props.put("enable.idempotence", "false");KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("logs", "cpu=90%")); // 发完就忘// 消费者自动提交 offsetprops.put("enable.auto.commit", "true");props.put("auto.commit.interval.ms", "1000"); // 每秒自动提交KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='metrics', durable=False) # 不持久化channel.basic_consume(queue='metrics', auto_ack=True, on_message_callback=lambda ch, method, properties, body: process_metric(body) # 处理失败?消息已经没了)channel.start_consuming()func produceAtMostOnce(writer *kafka.Writer, topic string, msg []byte) { // 不等待写入结果,直接发 writer.WriteMessages(context.Background(), kafka.Message{ Topic: topic, Value: msg, }) // 不检查返回值 → 消息可能丢失}At-Most-Once 的适用场景:
| 场景 | 为什么可接受丢失 | 配置建议 |
|---|---|---|
| 应用日志收集 | 丢几条日志不影响分析 | Kafka acks=0 |
| 监控指标上报 | 下一个采集周期会覆盖 | Kafka acks=0 |
| 用户行为埋点 | 统计维度允许误差 | Kafka acks=0 或 1 |
| 心跳信号 | 下一次心跳会补上 | 不持久化 |
1.2 At-Least-Once(至少一次)
消息不会丢失,但可能重复。生产者重试直到确认,消费者处理完才确认。 At-Least-Once 的实现方式
At-Least-Once 的核心是”重试直到成功”——生产者重试发送,消费者手动确认:
Properties producerProps = new Properties();producerProps.put("bootstrap.servers", "localhost:9092");producerProps.put("acks", "all"); // 等待所有副本确认producerProps.put("retries", Integer.MAX_VALUE); // 无限重试producerProps.put("max.in.flight.requests.per.connection", "1"); // 保证重试不乱序KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);// 消费者手动提交Properties consumerProps = new Properties();consumerProps.put("bootstrap.servers", "localhost:9092");consumerProps.put("enable.auto.commit", "false"); // 关闭自动提交consumerProps.put("group.id", "order-service");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord<String, String> record : records) { processOrder(record.value()); // 先处理 } consumer.commitSync(); // 处理完再提交 → 如果处理失败,下次重新消费}import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.queue_declare(queue='orders', durable=True) # 持久化队列channel.basic_qos(prefetch_count=1) # 一次只拿一条def callback(ch, method, properties, body): try: process_order(body) ch.basic_ack(delivery_tag=method.delivery_tag) # 处理成功,确认 except Exception as e: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)channel.basic_consume(queue='orders', on_message_callback=callback)channel.start_consuming()func produceAtLeastOnce(writer *kafka.Writer, topic string, key, value []byte) error { maxRetries := 3 for i := 0; i < maxRetries; i++ { err := writer.WriteMessages(context.Background(), kafka.Message{ Topic: topic, Key: key, Value: value, }) if err == nil { return nil } log.Printf("发送失败,第 %d 次重试: %v", i+1, err) time.Sleep(time.Second * time.Duration(i+1)) // 指数退避 } return fmt.Errorf("重试 %d 次后仍然失败", maxRetries)}At-Least-Once 重复产生的根因
消息重复不是”bug”,而是 At-Least-Once 语义的必然结果。以下是重复产生的典型场景:
| 场景 | 原因 | 结果 |
|---|---|---|
| 生产者超时重试 | Broker 收到消息但 ACK 丢失 | 消息被写入两次 |
| 消费者处理完但提交失败 | offset 提交前消费者宕机 | 重启后重新消费已处理的消息 |
| 网络抖动 | ACK 在网络中丢失 | 生产者重发消息 |
| Rebalance | 消费者组 Rebalance 期间 offset 未提交 | 新消费者从旧 offset 开始消费 |
1.3 Exactly-Once(恰好一次)
消息既不丢失也不重复。这是最难实现的语义——需要生产者和消费者协同保证。
| 语义 | 丢失 | 重复 | 实现复杂度 | 性能 | 适用场景 |
|---|---|---|---|---|---|
| At-Most-Once | 可能 | 不会 | 低 | 最高 | 日志、指标 |
| At-Least-Once | 不会 | 可能 | 中 | 高 | 大多数业务 |
| Exactly-Once | 不会 | 不会 | 高 | 中 | 金融、计费 |
二、幂等性设计
2.1 什么是幂等性
幂等性:同一操作执行一次和多次的效果相同。
// 非幂等:每次调用增加余额void deposit(String userId, int amount) { balance += amount; // 重复调用 → 余额错误}// 幂等:基于唯一ID去重void deposit(String userId, String txId, int amount) { if (processedTxs.contains(txId)) return; // 已处理,跳过 balance += amount; processedTxs.add(txId);}2.2 幂等性实现策略
| 策略 | 实现 | 优点 | 缺点 |
|---|---|---|---|
| 唯一ID去重 | 消息ID + 去重表 | 简单 | 去重表需要维护 |
| 数据库唯一约束 | INSERT IGNORE / ON CONFLICT | 可靠 | 数据库压力 |
| Redis SET | SETNX 原子操作 | 快速 | Redis 宕机风险 |
| 乐观锁 | 版本号 + CAS | 无额外存储 | 并发冲突 |
-- 数据库唯一约束实现幂等CREATE TABLE orders ( id BIGINT PRIMARY KEY, order_id VARCHAR(64) UNIQUE, -- 业务唯一ID amount DECIMAL(10,2), status VARCHAR(16));-- 幂等插入:重复插入会被唯一约束拒绝INSERT INTO orders (id, order_id, amount, status)VALUES (1, 'ORD-2026-001', 99.99, 'PAID')ON CONFLICT (order_id) DO NOTHING;2.3 幂等性实现的工程细节
Redis SETNX 实现
import redisimport jsonr = redis.Redis(host='localhost', port=6379, db=0)def process_message_idempotent(msg_id, payload): """基于 Redis SETNX 的幂等消费""" key = f"msg:processed:{msg_id}" acquired = r.set(key, "1", nx=True, ex=86400) # 24小时过期 if not acquired: print(f"消息 {msg_id} 已处理,跳过") return try: process(payload) except Exception as e: r.delete(key) # 处理失败,删除标记,允许重试 raise e乐观锁实现
-- 乐观锁:版本号控制CREATE TABLE account ( id BIGINT PRIMARY KEY, user_id VARCHAR(64), balance DECIMAL(10,2), version INT DEFAULT 0 -- 版本号);-- 幂等更新:版本号必须匹配UPDATE accountSET balance = balance + 100, version = version + 1WHERE user_id = 'user-123' AND version = 5;-- 如果 version 不匹配(已被其他请求更新),影响行数为 0去重表实现
// Java: 去重表 + 业务操作在同一事务中@Transactionalpublic void processOrder(Message msg) { String msgId = msg.getId(); // 1. 检查去重表 if (dedupRepository.existsById(msgId)) { log.info("消息 {} 已处理,跳过", msgId); return; } // 2. 执行业务逻辑 orderService.createOrder(parseOrder(msg)); // 3. 写入去重表(同一事务,保证原子性) dedupRepository.save(new DedupRecord(msgId, Instant.now()));}幂等性是 At-Least-Once 语义的”补丁”——它不保证消息不重复投递,但保证重复投递不会产生副作用。在大多数业务场景中,At-Least-Once + 幂等性已经足够,不需要追求 Exactly-Once。
三、Exactly-Once 的三条实现路径
Exactly-Once 不是单一技术,而是三条不同的实现路径,各有适用场景:
| 路径 | 解决的问题 | 实现方式 | 性能影响 | 适用场景 |
|---|---|---|---|---|
| 幂等生产者 | 生产者重试导致消息重复写入 | PID + SeqNum 去重 | ~5-10% | Kafka 生产端 |
| 事务消息 | 跨分区原子写入、消费-处理-生产原子性 | 两阶段提交 + 事务日志 | ~20-50% | Kafka Streams、流处理 |
| 消费者去重 | 消费端重复处理 | 去重表 / 唯一约束 | 取决于去重实现 | 通用业务场景 |
四、Kafka 幂等生产者深入
4.1 PID + Sequence Number 机制
Kafka 的幂等生产者通过 Producer ID(PID)+ Sequence Number 实现单分区内去重:
关键细节:
| 概念 | 说明 |
|---|---|
| PID(Producer ID) | 每个生产者实例启动时由 Broker 分配,唯一标识一个生产者 |
| Sequence Number | 每个分区维护独立的递增序列号,从 0 开始 |
| Epoch | PID 的代数,用于隔离僵尸生产者(Zombie Fencing) |
| 去重范围 | 仅保证单分区内的消息去重,不跨分区 |
// 启用幂等生产者Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("enable.idempotence", "true"); // 开启幂等// 以下参数会自动设置:// acks = all// retries = Integer.MAX_VALUE// max.in.flight.requests.per.connection <= 5KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息,Broker 自动去重producer.send(new ProducerRecord<>("orders", "order-1", "created"));producer.send(new ProducerRecord<>("orders", "order-1", "created")); // 重复,Broker 丢弃4.2 幂等生产者的局限
| 局限 | 说明 | 影响 |
|---|---|---|
| 仅单分区去重 | Sequence Number 按分区维护 | 跨分区消息无法去重 |
| 仅防生产者重试 | 不防消费者重复消费 | 消费端仍需幂等 |
| PID 重启后变化 | 生产者重启获得新 PID | 旧 PID 的去重状态丢失 |
| 序列号窗口有限 | Broker 只缓存最近的序列号 | 超出窗口的重复无法检测 |
| 不跨会话 | PID 在生产者重启后变化 | 无法实现跨重启的 Exactly-Once |
幂等生产者只解决了”生产者重试导致消息重复写入”这一个问题。它不解决消费者重复消费、跨分区原子性、跨重启一致性等问题。不要以为开了 enable.idempotence=true 就万事大吉了。
五、Kafka 事务深入
5.1 事务协调器(Transaction Coordinator)
Kafka 事务通过事务协调器管理事务状态,事务日志(__transaction_state)记录事务进度:
事务状态机:
| 状态 | 说明 | 转换条件 |
|---|---|---|
| EMPTY | 事务未开始 | InitProducerId → ONGOING |
| ONGOING | 事务进行中 | CommitTransaction → PREPARE_COMMIT |
| PREPARE_COMMIT | 准备提交 | 写入 Commit Marker → COMPLETE_COMMIT |
| COMPLETE_COMMIT | 提交完成 | — |
| PREPARE_ABORT | 准备回滚 | 写入 Abort Marker → COMPLETE_ABORT |
| COMPLETE_ABORT | 回滚完成 | — |
5.2 Kafka 事务代码实现
// Kafka 事务性生产者:跨分区原子写入Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("transactional.id", "order-tx-1"); // 事务ID,跨重启保持一致KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 初始化事务(只执行一次)producer.initTransactions();try { producer.beginTransaction(); // 往多个 Topic 原子写入 producer.send(new ProducerRecord<>("orders", "order-1", "created")); producer.send(new ProducerRecord<>("inventory", "order-1", "reserved")); producer.send(new ProducerRecord<>("notifications", "order-1", "pending")); // 提交事务:要么全部写入,要么全部不写入 producer.commitTransaction();} catch (Exception e) { // 回滚事务:所有消息对消费者不可见 producer.abortTransaction();}5.3 消费端 Exactly-Once:Read-Process-Commit
Kafka Streams 默认提供消费端的 Exactly-Once 语义——消费、处理、生产、提交 offset 在同一事务中:
Properties props = new Properties();props.put("application.id", "order-processing");props.put("bootstrap.servers", "localhost:9092");props.put("processing.guarantee", "exactly_once_v2"); // Exactly-Once 语义StreamsBuilder builder = new StreamsBuilder();builder.stream("orders", Consumed.with(Serdes.String(), Serdes.String())) .process(() -> new Processor<String, String>() { @Override public void process(Record<String, String> record) { // 1. 消费订单消息 Order order = parseOrder(record.value()); // 2. 处理业务逻辑 InventoryResult result = inventoryService.deduct(order); // 3. 输出结果(在同一事务中) context().forward(new Record<>(order.getId(), result.toString())); } }) .to("inventory-results");KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();手动实现 Read-Process-Commit:
// 手动实现消费端 Exactly-OnceKafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);consumer.subscribe(Collections.singletonList("orders"));while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; producer.beginTransaction(); try { for (ConsumerRecord<String, String> record : records) { // 处理消息 String result = processOrder(record.value()); // 输出结果 producer.send(new ProducerRecord<>("results", record.key(), result)); } // 在同一事务中提交 offset Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(); for (ConsumerRecord<String, String> record : records) { offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); } producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); producer.commitTransaction(); } catch (Exception e) { producer.abortTransaction(); }}六、RocketMQ 事务消息深入
6.1 半消息机制
RocketMQ 的事务消息通过”半消息”实现本地事务与消息发送的原子性:
6.2 RocketMQ 事务消息代码实现
// RocketMQ 事务消息完整实现TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");producer.setNamesrvAddr("localhost:9876");// 事务回查线程池ExecutorService executorService = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> new Thread(r, "transaction-check-thread"));producer.setExecutorService(executorService);// 事务监听器producer.setTransactionListener(new TransactionListener() { // 执行本地事务 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId = msg.getKeys(); try { // 本地事务:创建订单 Order order = parseOrder(msg.getBody()); orderService.createOrder(order); log.info("订单 {} 创建成功,提交消息", orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (DuplicateKeyException e) { // 订单已存在(幂等),提交消息 log.info("订单 {} 已存在,提交消息", orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("订单 {} 创建失败,回滚消息", orderId, e); return LocalTransactionState.ROLLBACK_MESSAGE; } } // 回查本地事务状态 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getKeys(); Order order = orderService.getById(orderId); if (order != null) { log.info("回查:订单 {} 存在,提交消息", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.warn("回查:订单 {} 不存在,回滚消息", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } }});producer.start();// 发送事务消息Message msg = new Message("order-events", "TagA", "{\"orderId\":\"ORD-001\",\"amount\":99.9}".getBytes());msg.setKeys("ORD-001"); // 业务唯一ID,用于回查producer.sendMessageInTransaction(msg, null);6.3 RocketMQ 事务消息的注意事项
| 注意事项 | 说明 | 建议 |
|---|---|---|
| 回查频率 | 默认 15 秒回查一次,最多回查 15 次 | 根据业务调整 transactionCheckMax 和 transactionCheckInterval |
| 本地事务耗时 | 本地事务不能太长,否则触发回查 | 控制在 10s 以内 |
| 半消息存储 | 半消息存在特殊 Topic,占用 Broker 存储 | 定期清理已提交/回滚的半消息 |
| 幂等性 | 回查可能多次调用 checkLocalTransaction | 本地事务查询必须幂等 |
| 事务状态 | UNKNOW 状态会触发回查 | 尽量返回 COMMIT 或 ROLLBACK,避免 UNKNOW |
七、RabbitMQ 的投递保证
7.1 生产者确认(Publisher Confirms)
RabbitMQ 通过 Publisher Confirms 机制保证消息到达 Broker:
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.confirm_delivery()try: channel.basic_publish( exchange='', routing_key='orders', body='order-123', properties=pika.BasicProperties(delivery_mode=2) # 持久化 ) print("消息确认到达 Broker")except pika.exceptions.UnroutableError: print("消息无法路由,可能丢失")// Java: RabbitMQ 异步确认channel.confirmSelect(); // 开启确认模式// 添加确认回调channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { // 消息确认到达 Broker pendingMessages.remove(deliveryTag); } @Override public void handleNack(long deliveryTag, boolean multiple) { // 消息被拒绝,需要重发 Message msg = pendingMessages.get(deliveryTag); resend(msg); }});7.2 消费者确认与重投递
import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))channel = connection.channel()channel.exchange_declare(exchange='dlx', exchange_type='direct')channel.queue_declare(queue='orders-dlq') # 死信队列channel.queue_bind(queue='orders-dlq', exchange='dlx', routing_key='orders')args = { 'x-dead-letter-exchange': 'dlx', 'x-dead-letter-routing-key': 'orders', 'x-message-ttl': 60000, # 消息 TTL 60秒}channel.queue_declare(queue='orders', durable=True, arguments=args)max_retries = 3def callback(ch, method, properties, body): headers = properties.headers or {} retry_count = headers.get('x-retry-count', 0) try: process_order(body) ch.basic_ack(delivery_tag=method.delivery_tag) except Exception as e: if retry_count < max_retries: ch.basic_publish( exchange='', routing_key='orders', body=body, properties=pika.BasicProperties( delivery_mode=2, headers={'x-retry-count': retry_count + 1} ) ) ch.basic_ack(delivery_tag=method.delivery_tag) # 确认原消息 else: ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False) log.error(f"消息处理失败,进入死信队列: {body}")channel.basic_qos(prefetch_count=1)channel.basic_consume(queue='orders', on_message_callback=callback)channel.start_consuming()7.3 RabbitMQ 投递保证级别
| 配置 | 语义 | 说明 |
|---|---|---|
| 发布不确认 + 自动 ACK | At-Most-Once | 最快,可能丢消息 |
| 发布确认 + 手动 ACK | At-Least-Once | 推荐,可能重复 |
| 发布确认 + 手动 ACK + 幂等消费 | 等效 Exactly-Once | 最可靠,需要业务层配合 |
八、各系统的投递保证对比
| 维度 | Kafka | RabbitMQ | RocketMQ |
|---|---|---|---|
| At-Most-Once | acks=0 + auto commit | auto_ack | — |
| At-Least-Once | acks=all + 手动 commit | Publisher Confirm + 手动 ACK | 默认语义 |
| Exactly-Once(生产端) | 幂等生产者 (PID+SeqNum) | — | — |
| Exactly-Once(事务) | 事务 API (transactional.id) | — | 事务消息(半消息) |
| Exactly-Once(消费端) | Kafka Streams / Read-Process-Commit | 幂等消费 | 幂等消费 |
| 消息持久化 | 日志文件(顺序写入) | 持久化队列(delivery_mode=2) | CommitLog(顺序写入) |
| 副本机制 | ISR 副本同步 | 镜像队列(HA) | 主从同步 |
| 死信处理 | 无内置(需自行实现) | DLX + TTL | 延迟等级 + 死信 Topic |
8.1 Kafka 投递保证配置速查
| 语义 | 生产者配置 | 消费者配置 |
|---|---|---|
| At-Most-Once | acks=0 | enable.auto.commit=true |
| At-Least-Once | acks=all, retries=MAX | enable.auto.commit=false, 手动 commit |
| Exactly-Once(幂等) | enable.idempotence=true | 同 At-Least-Once |
| Exactly-Once(事务) | transactional.id=xxx | isolation.level=read_committed |
8.2 RabbitMQ 投递保证配置速查
| 语义 | 生产者 | 消费者 |
|---|---|---|
| At-Most-Once | 不开 confirm | auto_ack=true |
| At-Least-Once | confirm_select() + 持久化 | auto_ack=false + 手动 ACK |
| 等效 Exactly-Once | 同 At-Least-Once | 同 At-Least-Once + 幂等去重 |
九、语义选择决策指南
9.1 决策流程
9.2 场景与语义匹配
| 业务场景 | 推荐语义 | 实现方式 | 理由 |
|---|---|---|---|
| 日志收集 | At-Most-Once | Kafka acks=0 | 丢几条日志无影响,吞吐量优先 |
| 监控指标 | At-Most-Once | Kafka acks=0 | 下一个采集周期覆盖,丢失可接受 |
| 用户行为埋点 | At-Most-Once | Kafka acks=1 | 统计维度允许误差 |
| 订单创建 | At-Least-Once + 幂等 | Kafka acks=all + 唯一约束 | 订单不能丢,重复用唯一约束去重 |
| 支付通知 | At-Least-Once + 幂等 | RabbitMQ Confirm + 去重表 | 支付不能丢,重复用去重表处理 |
| 库存扣减 | At-Least-Once + 乐观锁 | Kafka + 版本号 | 库存不能丢,重复用乐观锁拦截 |
| 转账交易 | Exactly-Once | Kafka 事务 | 金额不能出错,必须原子性 |
| 计费系统 | Exactly-Once | Kafka 事务 | 重复计费不可接受 |
| 订单创建+消息发送 | Exactly-Once | RocketMQ 事务消息 | 本地事务与消息必须一致 |
9.3 性能对比
| 语义 | Kafka 吞吐量(相对值) | RabbitMQ 吞吐量(相对值) | 延迟影响 |
|---|---|---|---|
| At-Most-Once | 100% | 100% | 无 |
| At-Least-Once | ~85% | ~80% | +1-5ms |
| Exactly-Once(幂等) | ~90% | — | +1-2ms |
| Exactly-Once(事务) | ~50-80% | — | +10-50ms |
Exactly-Once 的性能代价不是线性的——事务越小,开销占比越大。如果你的事务只包含 1-2 条消息,事务开销可能占到 50% 以上。如果包含 100 条消息,开销可能只有 10-20%。在设计事务时,尽量批量提交,减少事务频率。
十、总结
| 语义 | 丢失风险 | 重复风险 | 实现代价 | 推荐场景 |
|---|---|---|---|---|
| At-Most-Once | 有 | 无 | 低 | 日志、指标 |
| At-Least-Once | 无 | 有 | 中 | 通用业务(+幂等) |
| Exactly-Once | 无 | 无 | 高 | 金融、计费 |
| 实现路径 | 解决的问题 | 性能影响 | 适用系统 |
|---|---|---|---|
| 幂等生产者 | 生产端重复写入 | ~5-10% | Kafka |
| 事务消息 | 跨分区原子性 / 本地事务+消息一致性 | ~20-50% | Kafka / RocketMQ |
| 消费者去重 | 消费端重复处理 | 取决于实现 | 通用 |
大多数系统应该选择 At-Least-Once + 幂等性——它比 Exactly-Once 简单得多,性能也好得多。只有在”重复处理不可接受”的场景(如金融交易)才需要 Exactly-Once。选择语义时,从最简单的开始,只在必要时升级。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






