mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2734 字
8 分钟
消息语义:at-most-once 到 exactly-once
2026-03-13

分布式系统有一个反直觉的事实:精确一次(exactly-once)投递在理论上不可能实现。网络会断、进程会崩、时钟会漂——任何一次重试都可能产生重复消息,而避免重复的唯一方式是不重试,代价是丢消息。但业务系统依然要求”不丢不重”,这并非自相矛盾——exactly-once 语义不是靠网络保证的,而是靠幂等性事务在应用层实现的。理解 at-most-once、at-least-once、exactly-once 三种语义的实现代价,是设计可靠消息系统的第一步。

一、三种消息语义#

1.1 At-Most-Once(最多一次)#

消息可能丢失,但不会重复。这是最简单的语义——发送后不管,消费者处理完不确认也无所谓。 At-Most-Once 的实现方式

At-Most-Once 本质上就是”发后不管”——生产者不等待确认,消费者自动确认。来看各系统的实现:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "0"); // 不等确认
props.put("enable.idempotence", "false");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("logs", "cpu=90%")); // 发完就忘
// 消费者自动提交 offset
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000"); // 每秒自动提交
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='metrics', durable=False) # 不持久化
channel.basic_consume(queue='metrics', auto_ack=True,
on_message_callback=lambda ch, method, properties, body:
process_metric(body) # 处理失败?消息已经没了
)
channel.start_consuming()
func produceAtMostOnce(writer *kafka.Writer, topic string, msg []byte) {
// 不等待写入结果,直接发
writer.WriteMessages(context.Background(), kafka.Message{
Topic: topic,
Value: msg,
})
// 不检查返回值 → 消息可能丢失
}

At-Most-Once 的适用场景:

场景为什么可接受丢失配置建议
应用日志收集丢几条日志不影响分析Kafka acks=0
监控指标上报下一个采集周期会覆盖Kafka acks=0
用户行为埋点统计维度允许误差Kafka acks=0 或 1
心跳信号下一次心跳会补上不持久化

1.2 At-Least-Once(至少一次)#

消息不会丢失,但可能重复。生产者重试直到确认,消费者处理完才确认。 At-Least-Once 的实现方式

At-Least-Once 的核心是”重试直到成功”——生产者重试发送,消费者手动确认:

Properties producerProps = new Properties();
producerProps.put("bootstrap.servers", "localhost:9092");
producerProps.put("acks", "all"); // 等待所有副本确认
producerProps.put("retries", Integer.MAX_VALUE); // 无限重试
producerProps.put("max.in.flight.requests.per.connection", "1"); // 保证重试不乱序
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
// 消费者手动提交
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put("enable.auto.commit", "false"); // 关闭自动提交
consumerProps.put("group.id", "order-service");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processOrder(record.value()); // 先处理
}
consumer.commitSync(); // 处理完再提交 → 如果处理失败,下次重新消费
}
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='orders', durable=True) # 持久化队列
channel.basic_qos(prefetch_count=1) # 一次只拿一条
def callback(ch, method, properties, body):
try:
process_order(body)
ch.basic_ack(delivery_tag=method.delivery_tag) # 处理成功,确认
except Exception as e:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()
func produceAtLeastOnce(writer *kafka.Writer, topic string, key, value []byte) error {
maxRetries := 3
for i := 0; i < maxRetries; i++ {
err := writer.WriteMessages(context.Background(), kafka.Message{
Topic: topic,
Key: key,
Value: value,
})
if err == nil {
return nil
}
log.Printf("发送失败,第 %d 次重试: %v", i+1, err)
time.Sleep(time.Second * time.Duration(i+1)) // 指数退避
}
return fmt.Errorf("重试 %d 次后仍然失败", maxRetries)
}

At-Least-Once 重复产生的根因

消息重复不是”bug”,而是 At-Least-Once 语义的必然结果。以下是重复产生的典型场景:

场景原因结果
生产者超时重试Broker 收到消息但 ACK 丢失消息被写入两次
消费者处理完但提交失败offset 提交前消费者宕机重启后重新消费已处理的消息
网络抖动ACK 在网络中丢失生产者重发消息
Rebalance消费者组 Rebalance 期间 offset 未提交新消费者从旧 offset 开始消费
sequenceDiagram participant P as 生产者 participant B as Broker participant C as 消费者 P->>B: 发送消息 M1 B-->>P: ACK 丢失(网络抖动) P->>B: 重试发送 M1 Note over B: M1 被写入两次 B->>C: 投递 M1 C->>C: 处理 M1 成功 C--xB: 提交 offset 失败(消费者宕机) Note over B: offset 未更新 B->>C: 重新投递 M1(新消费者) Note over C: M1 被处理两次

1.3 Exactly-Once(恰好一次)#

消息既不丢失也不重复。这是最难实现的语义——需要生产者和消费者协同保证。

语义丢失重复实现复杂度性能适用场景
At-Most-Once可能不会最高日志、指标
At-Least-Once不会可能大多数业务
Exactly-Once不会不会金融、计费
graph LR subgraph "语义升级路径" AMO["At-Most-Once<br/>发后不管"] -->|"加重试"| ALO["At-Least-Once<br/>可能重复"] ALO -->|"加去重/事务"| EO["Exactly-Once<br/>不丢不重"] end subgraph "每级代价" C1["代价:无"] C2["代价:消息重复"] C3["代价:性能下降 20-50%<br/>架构复杂度大幅增加"] end AMO -.-> C1 ALO -.-> C2 EO -.-> C3 style AMO fill:#e3f2fd,stroke:#1565c0 style ALO fill:#fff9c4,stroke:#f9a825 style EO fill:#c8e6c9,stroke:#2e7d32

二、幂等性设计#

2.1 什么是幂等性#

幂等性:同一操作执行一次和多次的效果相同。

// 非幂等:每次调用增加余额
void deposit(String userId, int amount) {
balance += amount; // 重复调用 → 余额错误
}
// 幂等:基于唯一ID去重
void deposit(String userId, String txId, int amount) {
if (processedTxs.contains(txId)) return; // 已处理,跳过
balance += amount;
processedTxs.add(txId);
}

2.2 幂等性实现策略#

策略实现优点缺点
唯一ID去重消息ID + 去重表简单去重表需要维护
数据库唯一约束INSERT IGNORE / ON CONFLICT可靠数据库压力
Redis SETSETNX 原子操作快速Redis 宕机风险
乐观锁版本号 + CAS无额外存储并发冲突
-- 数据库唯一约束实现幂等
CREATE TABLE orders (
id BIGINT PRIMARY KEY,
order_id VARCHAR(64) UNIQUE, -- 业务唯一ID
amount DECIMAL(10,2),
status VARCHAR(16)
);
-- 幂等插入:重复插入会被唯一约束拒绝
INSERT INTO orders (id, order_id, amount, status)
VALUES (1, 'ORD-2026-001', 99.99, 'PAID')
ON CONFLICT (order_id) DO NOTHING;

2.3 幂等性实现的工程细节#

Redis SETNX 实现

import redis
import json
r = redis.Redis(host='localhost', port=6379, db=0)
def process_message_idempotent(msg_id, payload):
"""基于 Redis SETNX 的幂等消费"""
key = f"msg:processed:{msg_id}"
acquired = r.set(key, "1", nx=True, ex=86400) # 24小时过期
if not acquired:
print(f"消息 {msg_id} 已处理,跳过")
return
try:
process(payload)
except Exception as e:
r.delete(key) # 处理失败,删除标记,允许重试
raise e

乐观锁实现

-- 乐观锁:版本号控制
CREATE TABLE account (
id BIGINT PRIMARY KEY,
user_id VARCHAR(64),
balance DECIMAL(10,2),
version INT DEFAULT 0 -- 版本号
);
-- 幂等更新:版本号必须匹配
UPDATE account
SET balance = balance + 100, version = version + 1
WHERE user_id = 'user-123' AND version = 5;
-- 如果 version 不匹配(已被其他请求更新),影响行数为 0

去重表实现

// Java: 去重表 + 业务操作在同一事务中
@Transactional
public void processOrder(Message msg) {
String msgId = msg.getId();
// 1. 检查去重表
if (dedupRepository.existsById(msgId)) {
log.info("消息 {} 已处理,跳过", msgId);
return;
}
// 2. 执行业务逻辑
orderService.createOrder(parseOrder(msg));
// 3. 写入去重表(同一事务,保证原子性)
dedupRepository.save(new DedupRecord(msgId, Instant.now()));
}
Note

幂等性是 At-Least-Once 语义的”补丁”——它不保证消息不重复投递,但保证重复投递不会产生副作用。在大多数业务场景中,At-Least-Once + 幂等性已经足够,不需要追求 Exactly-Once。

三、Exactly-Once 的三条实现路径#

Exactly-Once 不是单一技术,而是三条不同的实现路径,各有适用场景:

graph TB EO["Exactly-Once"] --> P1["路径1:幂等生产者<br/>防止消息重复写入"] EO --> P2["路径2:事务消息<br/>保证跨分区/跨系统原子性"] EO --> P3["路径3:消费者去重<br/>消费端幂等处理"] P1 --> P1D["Kafka Idempotent Producer<br/>PID + Sequence Number"] P2 --> P2D["Kafka 事务 / RocketMQ 事务消息<br/>两阶段提交"] P3 --> P3D["去重表 / 唯一约束<br/>消费端幂等"] style EO fill:#c8e6c9,stroke:#2e7d32 style P1 fill:#e3f2fd,stroke:#1565c0 style P2 fill:#fff3e0,stroke:#e65100 style P3 fill:#f3e5f5,stroke:#6a1b9a
路径解决的问题实现方式性能影响适用场景
幂等生产者生产者重试导致消息重复写入PID + SeqNum 去重~5-10%Kafka 生产端
事务消息跨分区原子写入、消费-处理-生产原子性两阶段提交 + 事务日志~20-50%Kafka Streams、流处理
消费者去重消费端重复处理去重表 / 唯一约束取决于去重实现通用业务场景

四、Kafka 幂等生产者深入#

4.1 PID + Sequence Number 机制#

Kafka 的幂等生产者通过 Producer ID(PID)+ Sequence Number 实现单分区内去重:

sequenceDiagram participant P as 生产者 (PID=1) participant B as Broker P->>B: InitProducerId 请求 B-->>P: 返回 PID=1, Epoch=0 P->>B: 发送 (PID=1, Seq=0) → Partition 0 B->>B: 保存 Seq=0 B-->>P: ACK P->>B: 发送 (PID=1, Seq=1) → Partition 0 B->>B: 保存 Seq=1 B-->>P: ACK 丢失 P->>B: 重试 (PID=1, Seq=1) → Partition 0 B->>B: Seq=1 已存在,丢弃重复 B-->>P: ACK(不重复写入)

关键细节:

概念说明
PID(Producer ID)每个生产者实例启动时由 Broker 分配,唯一标识一个生产者
Sequence Number每个分区维护独立的递增序列号,从 0 开始
EpochPID 的代数,用于隔离僵尸生产者(Zombie Fencing)
去重范围仅保证单分区内的消息去重,不跨分区
// 启用幂等生产者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("enable.idempotence", "true"); // 开启幂等
// 以下参数会自动设置:
// acks = all
// retries = Integer.MAX_VALUE
// max.in.flight.requests.per.connection <= 5
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 发送消息,Broker 自动去重
producer.send(new ProducerRecord<>("orders", "order-1", "created"));
producer.send(new ProducerRecord<>("orders", "order-1", "created")); // 重复,Broker 丢弃

4.2 幂等生产者的局限#

局限说明影响
仅单分区去重Sequence Number 按分区维护跨分区消息无法去重
仅防生产者重试不防消费者重复消费消费端仍需幂等
PID 重启后变化生产者重启获得新 PID旧 PID 的去重状态丢失
序列号窗口有限Broker 只缓存最近的序列号超出窗口的重复无法检测
不跨会话PID 在生产者重启后变化无法实现跨重启的 Exactly-Once
Warning

幂等生产者只解决了”生产者重试导致消息重复写入”这一个问题。它不解决消费者重复消费、跨分区原子性、跨重启一致性等问题。不要以为开了 enable.idempotence=true 就万事大吉了。

五、Kafka 事务深入#

5.1 事务协调器(Transaction Coordinator)#

Kafka 事务通过事务协调器管理事务状态,事务日志(__transaction_state)记录事务进度:

sequenceDiagram participant P as 事务生产者<br/>transactional.id=tx-1 participant TC as 事务协调器<br/>Broker participant TL as 事务日志<br/>__transaction_state participant TP as 目标 Partition P->>TC: InitProducerId (transactional.id=tx-1) TC->>TL: 记录 {tx-1, PID=100, Epoch=0} TC-->>P: 返回 PID=100, Epoch=0 P->>TC: BeginTransaction TC->>TL: 记录 {PID=100, State=ONGOING} P->>TP: 发送消息 (PID=100, Seq=0) P->>TC: AddPartitionsToTxn (Partition 0, 1) TC->>TL: 记录 {PID=100, Partitions=[0,1]} P->>TC: CommitTransaction TC->>TL: 记录 {PID=100, State=PREPARE_COMMIT} TC->>TP: 写入 Commit Marker TC->>TL: 记录 {PID=100, State=COMPLETE_COMMIT} TC-->>P: 事务提交成功

事务状态机:

状态说明转换条件
EMPTY事务未开始InitProducerId → ONGOING
ONGOING事务进行中CommitTransaction → PREPARE_COMMIT
PREPARE_COMMIT准备提交写入 Commit Marker → COMPLETE_COMMIT
COMPLETE_COMMIT提交完成
PREPARE_ABORT准备回滚写入 Abort Marker → COMPLETE_ABORT
COMPLETE_ABORT回滚完成

5.2 Kafka 事务代码实现#

// Kafka 事务性生产者:跨分区原子写入
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "order-tx-1"); // 事务ID,跨重启保持一致
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务(只执行一次)
producer.initTransactions();
try {
producer.beginTransaction();
// 往多个 Topic 原子写入
producer.send(new ProducerRecord<>("orders", "order-1", "created"));
producer.send(new ProducerRecord<>("inventory", "order-1", "reserved"));
producer.send(new ProducerRecord<>("notifications", "order-1", "pending"));
// 提交事务:要么全部写入,要么全部不写入
producer.commitTransaction();
} catch (Exception e) {
// 回滚事务:所有消息对消费者不可见
producer.abortTransaction();
}

5.3 消费端 Exactly-Once:Read-Process-Commit#

Kafka Streams 默认提供消费端的 Exactly-Once 语义——消费、处理、生产、提交 offset 在同一事务中:

Properties props = new Properties();
props.put("application.id", "order-processing");
props.put("bootstrap.servers", "localhost:9092");
props.put("processing.guarantee", "exactly_once_v2"); // Exactly-Once 语义
StreamsBuilder builder = new StreamsBuilder();
builder.stream("orders", Consumed.with(Serdes.String(), Serdes.String()))
.process(() -> new Processor<String, String>() {
@Override
public void process(Record<String, String> record) {
// 1. 消费订单消息
Order order = parseOrder(record.value());
// 2. 处理业务逻辑
InventoryResult result = inventoryService.deduct(order);
// 3. 输出结果(在同一事务中)
context().forward(new Record<>(order.getId(), result.toString()));
}
})
.to("inventory-results");
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

手动实现 Read-Process-Commit:

// 手动实现消费端 Exactly-Once
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
consumer.subscribe(Collections.singletonList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// 处理消息
String result = processOrder(record.value());
// 输出结果
producer.send(new ProducerRecord<>("results", record.key(), result));
}
// 在同一事务中提交 offset
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
offsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
}
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}

六、RocketMQ 事务消息深入#

6.1 半消息机制#

RocketMQ 的事务消息通过”半消息”实现本地事务与消息发送的原子性:

sequenceDiagram participant P as 生产者 participant B as Broker participant DB as 本地数据库 participant C as 消费者 P->>B: 1. 发送半消息(消费者不可见) B->>B: 存入半消息 Topic<br/>RMQ_SYS_TRANS_HALF_TOPIC B-->>P: 半消息发送成功 P->>DB: 2. 执行本地事务<br/>(如:创建订单) alt 本地事务成功 P->>B: 3a. 提交消息(Commit) B->>B: 将消息移到目标 Topic B->>C: 消费者可见,正常消费 else 本地事务失败 P->>B: 3b. 回滚消息(Rollback) B->>B: 删除半消息 C--xC: 消费者不会收到 else 状态未知(网络超时) Note over B: 4. 回查本地事务状态 B->>P: checkLocalTransaction P->>DB: 查询本地事务结果 P->>B: 返回 Commit 或 Rollback end

6.2 RocketMQ 事务消息代码实现#

// RocketMQ 事务消息完整实现
TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");
producer.setNamesrvAddr("localhost:9876");
// 事务回查线程池
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000),
r -> new Thread(r, "transaction-check-thread")
);
producer.setExecutorService(executorService);
// 事务监听器
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = msg.getKeys();
try {
// 本地事务:创建订单
Order order = parseOrder(msg.getBody());
orderService.createOrder(order);
log.info("订单 {} 创建成功,提交消息", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (DuplicateKeyException e) {
// 订单已存在(幂等),提交消息
log.info("订单 {} 已存在,提交消息", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
log.error("订单 {} 创建失败,回滚消息", orderId, e);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 回查本地事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
Order order = orderService.getById(orderId);
if (order != null) {
log.info("回查:订单 {} 存在,提交消息", orderId);
return LocalTransactionState.COMMIT_MESSAGE;
} else {
log.warn("回查:订单 {} 不存在,回滚消息", orderId);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
});
producer.start();
// 发送事务消息
Message msg = new Message("order-events", "TagA",
"{\"orderId\":\"ORD-001\",\"amount\":99.9}".getBytes());
msg.setKeys("ORD-001"); // 业务唯一ID,用于回查
producer.sendMessageInTransaction(msg, null);

6.3 RocketMQ 事务消息的注意事项#

注意事项说明建议
回查频率默认 15 秒回查一次,最多回查 15 次根据业务调整 transactionCheckMaxtransactionCheckInterval
本地事务耗时本地事务不能太长,否则触发回查控制在 10s 以内
半消息存储半消息存在特殊 Topic,占用 Broker 存储定期清理已提交/回滚的半消息
幂等性回查可能多次调用 checkLocalTransaction本地事务查询必须幂等
事务状态UNKNOW 状态会触发回查尽量返回 COMMIT 或 ROLLBACK,避免 UNKNOW

七、RabbitMQ 的投递保证#

7.1 生产者确认(Publisher Confirms)#

RabbitMQ 通过 Publisher Confirms 机制保证消息到达 Broker:

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.confirm_delivery()
try:
channel.basic_publish(
exchange='',
routing_key='orders',
body='order-123',
properties=pika.BasicProperties(delivery_mode=2) # 持久化
)
print("消息确认到达 Broker")
except pika.exceptions.UnroutableError:
print("消息无法路由,可能丢失")
// Java: RabbitMQ 异步确认
channel.confirmSelect(); // 开启确认模式
// 添加确认回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) {
// 消息确认到达 Broker
pendingMessages.remove(deliveryTag);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) {
// 消息被拒绝,需要重发
Message msg = pendingMessages.get(deliveryTag);
resend(msg);
}
});

7.2 消费者确认与重投递#

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='dlx', exchange_type='direct')
channel.queue_declare(queue='orders-dlq') # 死信队列
channel.queue_bind(queue='orders-dlq', exchange='dlx', routing_key='orders')
args = {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'orders',
'x-message-ttl': 60000, # 消息 TTL 60秒
}
channel.queue_declare(queue='orders', durable=True, arguments=args)
max_retries = 3
def callback(ch, method, properties, body):
headers = properties.headers or {}
retry_count = headers.get('x-retry-count', 0)
try:
process_order(body)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
if retry_count < max_retries:
ch.basic_publish(
exchange='',
routing_key='orders',
body=body,
properties=pika.BasicProperties(
delivery_mode=2,
headers={'x-retry-count': retry_count + 1}
)
)
ch.basic_ack(delivery_tag=method.delivery_tag) # 确认原消息
else:
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
log.error(f"消息处理失败,进入死信队列: {body}")
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='orders', on_message_callback=callback)
channel.start_consuming()

7.3 RabbitMQ 投递保证级别#

配置语义说明
发布不确认 + 自动 ACKAt-Most-Once最快,可能丢消息
发布确认 + 手动 ACKAt-Least-Once推荐,可能重复
发布确认 + 手动 ACK + 幂等消费等效 Exactly-Once最可靠,需要业务层配合
graph TB subgraph "RabbitMQ 可靠性配置" L1["Level 1: At-Most-Once<br/>不确认 + auto_ack"] --> L2["Level 2: At-Least-Once<br/>Publisher Confirm + 手动 ACK"] L2 --> L3["Level 3: 等效 Exactly-Once<br/>+ 幂等消费"] end subgraph "性能影响" P1["吞吐量最高"] P2["吞吐量下降 ~20%"] P3["吞吐量下降 ~30%<br/>+ 去重开销"] end L1 -.-> P1 L2 -.-> P2 L3 -.-> P3 style L1 fill:#e3f2fd,stroke:#1565c0 style L2 fill:#fff9c4,stroke:#f9a825 style L3 fill:#c8e6c9,stroke:#2e7d32

八、各系统的投递保证对比#

维度KafkaRabbitMQRocketMQ
At-Most-Onceacks=0 + auto commitauto_ack
At-Least-Onceacks=all + 手动 commitPublisher Confirm + 手动 ACK默认语义
Exactly-Once(生产端)幂等生产者 (PID+SeqNum)
Exactly-Once(事务)事务 API (transactional.id)事务消息(半消息)
Exactly-Once(消费端)Kafka Streams / Read-Process-Commit幂等消费幂等消费
消息持久化日志文件(顺序写入)持久化队列(delivery_mode=2)CommitLog(顺序写入)
副本机制ISR 副本同步镜像队列(HA)主从同步
死信处理无内置(需自行实现)DLX + TTL延迟等级 + 死信 Topic

8.1 Kafka 投递保证配置速查#

语义生产者配置消费者配置
At-Most-Onceacks=0enable.auto.commit=true
At-Least-Onceacks=all, retries=MAXenable.auto.commit=false, 手动 commit
Exactly-Once(幂等)enable.idempotence=true同 At-Least-Once
Exactly-Once(事务)transactional.id=xxxisolation.level=read_committed

8.2 RabbitMQ 投递保证配置速查#

语义生产者消费者
At-Most-Once不开 confirmauto_ack=true
At-Least-Onceconfirm_select() + 持久化auto_ack=false + 手动 ACK
等效 Exactly-Once同 At-Least-Once同 At-Least-Once + 幂等去重

九、语义选择决策指南#

9.1 决策流程#

graph TB START["消息丢失可接受?"] -->|"是"| AMO["At-Most-Once<br/>日志、指标、埋点"] START -->|"否"| Q1["消息重复可接受?"] Q1 -->|"是"| ALO["At-Least-Once<br/>大多数业务场景"] Q1 -->|"否"| Q2["需要跨分区原子性?"] Q2 -->|"是"| KAFKA_TX["Kafka 事务<br/>流处理、消费-处理-生产"] Q2 -->|"否"| Q3["需要本地事务+消息一致性?"] Q3 -->|"是"| ROCKET_TX["RocketMQ 事务消息<br/>订单创建+消息发送"] Q3 -->|"否"| IDEMP["At-Least-Once + 幂等<br/>最通用的选择"] style AMO fill:#e3f2fd,stroke:#1565c0 style ALO fill:#fff9c4,stroke:#f9a825 style KAFKA_TX fill:#c8e6c9,stroke:#2e7d32 style ROCKET_TX fill:#f3e5f5,stroke:#6a1b9a style IDEMP fill:#fff9c4,stroke:#f9a825

9.2 场景与语义匹配#

业务场景推荐语义实现方式理由
日志收集At-Most-OnceKafka acks=0丢几条日志无影响,吞吐量优先
监控指标At-Most-OnceKafka acks=0下一个采集周期覆盖,丢失可接受
用户行为埋点At-Most-OnceKafka acks=1统计维度允许误差
订单创建At-Least-Once + 幂等Kafka acks=all + 唯一约束订单不能丢,重复用唯一约束去重
支付通知At-Least-Once + 幂等RabbitMQ Confirm + 去重表支付不能丢,重复用去重表处理
库存扣减At-Least-Once + 乐观锁Kafka + 版本号库存不能丢,重复用乐观锁拦截
转账交易Exactly-OnceKafka 事务金额不能出错,必须原子性
计费系统Exactly-OnceKafka 事务重复计费不可接受
订单创建+消息发送Exactly-OnceRocketMQ 事务消息本地事务与消息必须一致

9.3 性能对比#

语义Kafka 吞吐量(相对值)RabbitMQ 吞吐量(相对值)延迟影响
At-Most-Once100%100%
At-Least-Once~85%~80%+1-5ms
Exactly-Once(幂等)~90%+1-2ms
Exactly-Once(事务)~50-80%+10-50ms
Caution

Exactly-Once 的性能代价不是线性的——事务越小,开销占比越大。如果你的事务只包含 1-2 条消息,事务开销可能占到 50% 以上。如果包含 100 条消息,开销可能只有 10-20%。在设计事务时,尽量批量提交,减少事务频率。

十、总结#

语义丢失风险重复风险实现代价推荐场景
At-Most-Once日志、指标
At-Least-Once通用业务(+幂等)
Exactly-Once金融、计费
实现路径解决的问题性能影响适用系统
幂等生产者生产端重复写入~5-10%Kafka
事务消息跨分区原子性 / 本地事务+消息一致性~20-50%Kafka / RocketMQ
消费者去重消费端重复处理取决于实现通用
Tip

大多数系统应该选择 At-Least-Once + 幂等性——它比 Exactly-Once 简单得多,性能也好得多。只有在”重复处理不可接受”的场景(如金融交易)才需要 Exactly-Once。选择语义时,从最简单的开始,只在必要时升级。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

消息语义:at-most-once 到 exactly-once
https://blog.souloss.com/posts/messaging/message-semantics/
作者
Souloss
发布于
2026-03-13
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时