mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2275 字
6 分钟
消息与数据库一致性
2026-05-08

用户付了钱,订单状态更新了,但支付通知没发出去——下游支付服务不知道这笔交易,对账时发现差额。这是双写问题的经典表现:数据库写入和消息发送是两个独立操作,无法原子完成。先写数据库再发消息,消息可能丢失;先发消息再写数据库,数据库失败则消息多余;在数据库事务里发消息,事务回滚但消息已经发出。三种顺序,三种不一致。解决双写问题的方案——本地消息表、事务性发件箱、CDC——本质上都是在用不同的方式制造”伪原子性”。

一、双写问题#

1.1 什么是双写?#

双写是指一个业务操作需要同时写入数据库和发送消息:

graph LR APP["应用服务"] -->|"1. 写入 DB"| DB["数据库"] APP -->|"2. 发送消息"| MQ["消息队列"]
步骤操作可能失败
步骤 1写入数据库数据库宕机、超时
步骤 2发送消息消息系统宕机、网络断开

1.2 双写的三种不一致场景#

场景步骤 1步骤 2后果
消息丢失DB 写入成功消息发送失败数据库有数据,下游不知道
消息多余DB 写入失败消息发送成功下游收到不存在的数据
顺序错误DB 写入慢消息先到下游先收到消息,但 DB 还没更新
Note

双写问题的本质是:数据库事务和消息发送是两个独立的操作,无法原子性完成。数据库事务保证 ACID,消息发送保证至少一次投递,但两者之间没有事务协调器。

1.3 常见的错误方案#

方案代码问题
先 DB 后消息db.save(); mq.send();消息发送失败 → 丢失
先消息后 DBmq.send(); db.save();DB 失败 → 消息多余
DB 事务中发消息@Transactional { db.save(); mq.send(); }消息发送成功但事务回滚 → 消息多余
异步发送db.save(); async.send();应用崩溃 → 消息丢失

1.4 双写问题的形式化分析#

更严谨地分析双写失败的所有可能。假设操作 A(写 DB)和操作 B(发消息)顺序执行:

graph TD START["执行 A:写 DB"] --> A_RESULT{"A 成功?"} A_RESULT -->|"成功"| EXEC_B["执行 B:发消息"] A_RESULT -->|"失败"| FAIL_A["A 失败<br/>安全:DB 和 MQ 都没变"] EXEC_B --> B_RESULT{"B 成功?"} B_RESULT -->|"成功"| SUCCESS["一致:DB 和 MQ 都更新"] B_RESULT -->|"失败"| INCONSISTENT["不一致:<br/>DB 已更新,MQ 未更新"] style FAIL_A fill:#c8e6c9,stroke:#2e7d32 style SUCCESS fill:#c8e6c9,stroke:#2e7d32 style INCONSISTENT fill:#ffcdd2,stroke:#c62828

关键观察:A 成功后 B 失败 是唯一的不一致场景。所有解决方案的核心思路都是:要么让 B 不可能失败(本地消息表),要么让 B 可以安全重试(幂等 + 重试),要么消除 B 作为独立步骤的需要(CDC)。

二、本地消息表#

2.1 原理#

本地消息表将消息持久化到业务数据库中,利用数据库事务保证业务操作和消息记录的原子性:

sequenceDiagram participant C as Client participant S as 服务 participant DB as 数据库 participant MQ as 消息队列 C->>S: 业务请求 S->>DB: 1. 开启事务 S->>DB: 2. 写入业务数据 S->>DB: 3. 写入消息表(状态=待发送) S->>DB: 4. 提交事务 S->>MQ: 5. 发送消息 S->>DB: 6. 更新消息状态=已发送 Note over S,MQ: 步骤 5 失败? S->>S: 定时任务扫描消息表 S->>MQ: 7. 重新发送待发送消息

2.2 消息表设计#

-- 本地消息表
CREATE TABLE outbox_messages (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
topic VARCHAR(255) NOT NULL COMMENT '消息 Topic',
message_key VARCHAR(255) NOT NULL COMMENT '消息 Key(用于分区路由和幂等)',
message_value TEXT NOT NULL COMMENT '消息内容(JSON)',
status ENUM('PENDING', 'SENT', 'FAILED') DEFAULT 'PENDING' COMMENT '发送状态',
retry_count INT DEFAULT 0 COMMENT '重试次数',
max_retries INT DEFAULT 5 COMMENT '最大重试次数',
next_retry_at TIMESTAMP NULL COMMENT '下次重试时间(指数退避)',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_status_next_retry (status, next_retry_at),
INDEX idx_created (created_at)
) ENGINE=InnoDB COMMENT='本地消息表';

2.3 完整实现(Java)#

@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxMessageRepository outboxRepository;
private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 1. 写入业务数据
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setCustomerId(request.getCustomerId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setItems(request.getItems());
orderRepository.save(order);
// 2. 写入消息表(同一事务,保证原子性)
OutboxMessage message = new OutboxMessage();
message.setTopic("order-events");
message.setMessageKey(order.getId()); // orderId 作为 Key,保证分区有序
message.setMessageValue(JSON.toJSONString(order));
message.setStatus(MessageStatus.PENDING);
message.setRetryCount(0);
message.setMaxRetries(5);
message.setNextRetryAt(LocalDateTime.now()); // 立即可发送
outboxRepository.save(message);
return order;
}
// 定时任务:扫描待发送消息
@Scheduled(fixedRate = 5000)
public void sendPendingMessages() {
List<OutboxMessage> pending = outboxRepository
.findByStatusAndNextRetryAtBefore(MessageStatus.PENDING, LocalDateTime.now());
for (OutboxMessage msg : pending) {
try {
// 同步发送,确保确认后再更新状态
kafkaTemplate.send(msg.getTopic(), msg.getMessageKey(), msg.getMessageValue())
.get(10, TimeUnit.SECONDS);
msg.setStatus(MessageStatus.SENT);
} catch (Exception e) {
msg.setRetryCount(msg.getRetryCount() + 1);
if (msg.getRetryCount() >= msg.getMaxRetries()) {
msg.setStatus(MessageStatus.FAILED);
// 告警:消息发送最终失败
alertService.alert("消息发送失败: " + msg.getId());
} else {
// 指数退避:1s, 2s, 4s, 8s, 16s
long delaySeconds = (long) Math.pow(2, msg.getRetryCount());
msg.setNextRetryAt(LocalDateTime.now().plusSeconds(delaySeconds));
}
}
outboxRepository.save(msg);
}
}
// 定期清理已发送消息(保留 7 天)
@Scheduled(cron = "0 0 3 * * ?")
public void cleanSentMessages() {
outboxRepository.deleteByStatusAndUpdatedAtBefore(
MessageStatus.SENT,
LocalDateTime.now().minusDays(7)
);
}
}

2.4 Python 实现#

import json
import uuid
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, String, Integer, Enum, DateTime, Text
from sqlalchemy.orm import sessionmaker, declarative_base
from kafka import KafkaProducer
Base = declarative_base()
class OutboxMessage(Base):
__tablename__ = 'outbox_messages'
id = Column(Integer, primary_key=True, autoincrement=True)
topic = Column(String(255), nullable=False)
message_key = Column(String(255), nullable=False)
message_value = Column(Text, nullable=False)
status = Column(Enum('PENDING', 'SENT', 'FAILED'), default='PENDING')
retry_count = Column(Integer, default=0)
next_retry_at = Column(DateTime, default=datetime.now)
class OrderService:
def __init__(self, db_url: str, kafka_servers: list):
self.engine = create_engine(db_url)
self.Session = sessionmaker(bind=self.engine)
self.producer = KafkaProducer(
bootstrap_servers=kafka_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: k.encode('utf-8'),
acks='all',
retries=3
)
def create_order(self, customer_id: str, amount: float, items: list):
session = self.Session()
try:
order_id = str(uuid.uuid4())
# 1. 写入业务数据
session.execute(
"INSERT INTO orders (id, customer_id, amount, status) "
"VALUES (:id, :cid, :amt, 'CREATED')",
{"id": order_id, "cid": customer_id, "amt": amount}
)
# 2. 写入消息表(同一事务)
session.execute(
"INSERT INTO outbox_messages (topic, message_key, message_value, status) "
"VALUES ('order-events', :key, :value, 'PENDING')",
{
"key": order_id,
"value": json.dumps({
"orderId": order_id, "customerId": customer_id,
"amount": amount, "items": items,
"eventType": "ORDER_CREATED"
})
}
)
session.commit()
return order_id
except Exception:
session.rollback()
raise
def send_pending_messages(self):
session = self.Session()
messages = session.execute(
"SELECT * FROM outbox_messages "
"WHERE status = 'PENDING' AND next_retry_at <= :now "
"LIMIT 100",
{"now": datetime.now()}
).fetchall()
for msg in messages:
try:
future = self.producer.send(
msg.topic, key=msg.message_key, value=json.loads(msg.message_value)
)
future.get(timeout=10) # 同步等待确认
session.execute(
"UPDATE outbox_messages SET status = 'SENT' WHERE id = :id",
{"id": msg.id}
)
except Exception:
new_retry = msg.retry_count + 1
if new_retry >= 5:
session.execute(
"UPDATE outbox_messages SET status = 'FAILED' WHERE id = :id",
{"id": msg.id}
)
else:
delay = 2 ** new_retry
session.execute(
"UPDATE outbox_messages SET retry_count = :rc, "
"next_retry_at = :nrt WHERE id = :id",
{"rc": new_retry, "nrt": datetime.now() + timedelta(seconds=delay), "id": msg.id}
)
session.commit()

2.5 优缺点#

优点缺点
实现简单消息表与业务表耦合
利用数据库事务保证原子性定时轮询有延迟
消息不丢(持久化到 DB)需要定期清理已发送消息
支持重试高并发时消息表成为瓶颈
不依赖额外组件同一数据库增加存储压力

三、事务性发件箱(Transactional Outbox)#

3.1 原理#

事务性发件箱是本地消息表的改进版,通过 CDC(Change Data Capture)替代定时轮询:

graph TB APP["应用服务"] -->|"1. 事务写入"| DB["数据库<br/>业务表 + Outbox 表"] DB -->|"2. CDC 捕获变更"| CDC["Debezium / Canal"] CDC -->|"3. 转发消息"| MQ["Kafka"] MQ --> C["消费者"]
组件说明替代方案
Outbox 表存储待发送消息业务事件表
CDC 工具捕获数据库变更Debezium、Canal、Maxwell
消息队列传递消息Kafka、RocketMQ

3.2 Debezium 实现#

// Debezium Connector 配置
{
"name": "order-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "order_service",
"database.include.list": "order_db",
"table.include.list": "order_db.outbox_messages",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.order",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "message_key",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "message_value"
}
}
// 应用端:只需写入 Outbox 表
@Service
public class OrderService {
@Transactional
public void createOrder(Order order) {
// 1. 写入业务数据
orderRepository.save(order);
// 2. 写入 Outbox 表(同一事务)
OutboxEvent event = new OutboxEvent();
event.setAggregateType("Order");
event.setAggregateId(order.getId());
event.setEventType("ORDER_CREATED");
event.setPayload(JSON.toJSONString(order));
outboxRepository.save(event);
// Debezium 会自动捕获 Outbox 表的变更并发送到 Kafka
// 无需手动发送消息!
}
}

3.3 Outbox 表设计#

-- 标准 Outbox 表(Debezium 推荐格式)
CREATE TABLE outbox_events (
id VARCHAR(255) PRIMARY KEY, -- 事件 ID(UUID)
aggregatetype VARCHAR(255) NOT NULL, -- 聚合类型(如 Order)
aggregateid VARCHAR(255) NOT NULL, -- 聚合 ID(如订单 ID)
eventtype VARCHAR(255) NOT NULL, -- 事件类型(如 ORDER_CREATED)
payload JSON NOT NULL, -- 事件负载
created_at TIMESTAMP DEFAULT NOW(), -- 创建时间
INDEX idx_aggregate (aggregatetype, aggregateid),
INDEX idx_created (created_at)
);
-- 清理策略:定期删除已处理的事件
-- Debezium 读取后可以安全删除(CDC 已捕获)
DELETE FROM outbox_events WHERE created_at < DATE_SUB(NOW(), INTERVAL 7 DAY);

3.4 事务性发件箱的完整流程#

sequenceDiagram participant Client participant OrderService participant DB as MySQL participant Debezium participant Kafka participant PaymentService Client->>OrderService: POST /orders OrderService->>DB: BEGIN TRANSACTION OrderService->>DB: INSERT INTO orders (...) OrderService->>DB: INSERT INTO outbox_events (...) OrderService->>DB: COMMIT OrderService-->>Client: 201 Created Note over DB,Debezium: 异步 CDC 捕获 DB-->>Debezium: Binlog Event (INSERT outbox_events) Debezium->>Debezium: EventRouter SMT<br/>提取 payload + 路由到 Topic Debezium->>Kafka: Produce to order-events Note over Kafka,PaymentService: 消费者处理 Kafka-->>PaymentService: Consume ORDER_CREATED PaymentService->>PaymentService: 创建待支付记录

3.5 本地消息表 vs 事务性发件箱#

维度本地消息表事务性发件箱
消息发送定时轮询CDC 实时捕获
延迟秒级毫秒级
额外组件Debezium/Canal
实现复杂度
可靠性
吞吐量中(轮询开销)高(流式处理)
运维复杂度中(需维护 CDC)
消息清理需手动清理CDC 捕获后可安全删除
Warning

事务性发件箱引入了 CDC 组件,增加了运维复杂度。如果团队没有 CDC 的运维经验,建议先从本地消息表开始,等业务规模增长后再引入 CDC。

四、CDC 同步#

4.1 CDC 原理#

CDC(Change Data Capture)通过读取数据库的 Binlog/Redo Log 捕获数据变更:

graph LR APP["应用服务"] -->|"写入"| DB["数据库"] DB -->|"Binlog"| CDC["CDC 工具<br/>(Debezium/Canal)"] CDC -->|"变更事件"| MQ["Kafka"] MQ --> SINK["下游服务<br/>ES/Redis/数据仓库"]
CDC 工具支持数据库特点
DebeziumMySQL/PG/Oracle/SQL Server开源、Kafka Connect 集成
CanalMySQL阿里开源、中文社区
MaxwellMySQL轻量、JSON 输出
AWS DMS多种云服务、托管

4.2 Debezium 配置示例#

{
"name": "order-cdc-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "password",
"database.server.id": "184054",
"database.server.name": "order_service",
"database.include.list": "order_db",
"table.include.list": "order_db.orders,order_db.order_items",
"snapshot.mode": "schema_only",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.order"
}
}
# 注册 Debezium Connector
curl -X POST -H "Content-Type: application/json" \
--data @debezium-order.json \
http://localhost:8083/connectors
# 查看 Connector 状态
curl http://localhost:8083/connectors/order-cdc-connector/status

4.3 CDC 消费端#

// 消费 Debezium 变更事件
@KafkaListener(topics = "order_service.order_db.orders")
public void handleOrderChange(ConsumerRecord<String, String> record) {
JSONObject event = JSON.parseObject(record.value());
JSONObject payload = event.getJSONObject("payload");
String operation = payload.getString("op");
JSONObject before = payload.getJSONObject("before");
JSONObject after = payload.getJSONObject("after");
switch (operation) {
case "c" -> { // Create
syncToElasticsearch(after);
syncToRedis(after);
}
case "u" -> { // Update
syncToElasticsearch(after);
invalidateRedisCache(after);
}
case "d" -> { // Delete
deleteFromElasticsearch(before.getString("id"));
deleteFromRedis(before.getString("id"));
}
}
}

4.4 CDC 的注意事项#

注意事项说明解决方案
Binlog 格式必须使用 ROW 格式binlog_format=ROW
Binlog 保留CDC 延迟时 Binlog 可能被清理增大 expire_logs_days
Schema 变更表结构变更可能导致 CDC 中断使用 Schema History Topic
全量同步首次启动需要全量快照snapshot.mode=schema_only 跳过
消费者幂等CDC 事件可能重复消费使用主键做幂等

五、Saga 模式#

5.1 为什么需要 Saga?#

当业务操作涉及多个服务,每个服务都有自己的数据库时,本地消息表只能保证”单个服务”的 DB+MQ 一致性。跨服务的一致性需要 Saga 模式——通过编排一系列本地事务,每个本地事务发送事件触发下一个步骤,失败时执行补偿操作。

5.2 Saga 编排模式#

graph TD START["创建订单"] --> STEP1["扣减库存"] STEP1 -->|"成功"| STEP2["扣减余额"] STEP1 -->|"失败"| COMP1["取消订单"] STEP2 -->|"成功"| STEP3["确认订单"] STEP2 -->|"失败"| COMP2["恢复库存<br/>+ 取消订单"] STEP3 --> SUCCESS["订单完成"] COMP1 --> FAIL["订单失败"] COMP2 --> FAIL style SUCCESS fill:#c8e6c9,stroke:#2e7d32 style FAIL fill:#ffcdd2,stroke:#c62828

5.3 Saga 实现(编排式)#

// Saga 编排器:定义事务步骤和补偿操作
@Service
public class OrderSagaOrchestrator {
private final KafkaTemplate<String, String> kafkaTemplate;
public void executeOrderSaga(Order order) {
// 步骤 1:创建订单(已在 OrderService 中完成,Outbox 已发送)
// 等待库存服务消费 ORDER_CREATED 事件
// Saga 状态机
sagaStateRepository.save(new SagaState(order.getId(), "INVENTORY_DEDUCTING"));
}
// 库存扣减成功
@KafkaListener(topics = "inventory-events", groupId = "saga-orchestrator")
public void onInventoryEvent(ConsumerRecord<String, String> record) {
InventoryEvent event = JSON.parseObject(record.value(), InventoryEvent.class);
SagaState saga = sagaStateRepository.findByOrderId(event.getOrderId());
switch (event.getEventType()) {
case "INVENTORY_DEDUCTED" -> {
// 库存扣减成功 → 触发支付
saga.setState("PAYMENT_PROCESSING");
sendPaymentCommand(event.getOrderId());
}
case "INVENTORY_DEDUCT_FAILED" -> {
// 库存扣减失败 → 取消订单
saga.setState("COMPENSATING");
sendCancelOrderCommand(event.getOrderId());
}
}
sagaStateRepository.save(saga);
}
// 支付结果
@KafkaListener(topics = "payment-events", groupId = "saga-orchestrator")
public void onPaymentEvent(ConsumerRecord<String, String> record) {
PaymentEvent event = JSON.parseObject(record.value(), PaymentEvent.class);
SagaState saga = sagaStateRepository.findByOrderId(event.getOrderId());
switch (event.getEventType()) {
case "PAYMENT_COMPLETED" -> {
// 支付成功 → 确认订单
saga.setState("COMPLETED");
sendConfirmOrderCommand(event.getOrderId());
}
case "PAYMENT_FAILED" -> {
// 支付失败 → 补偿:恢复库存 + 取消订单
saga.setState("COMPENSATING");
sendRestoreInventoryCommand(event.getOrderId());
sendCancelOrderCommand(event.getOrderId());
}
}
sagaStateRepository.save(saga);
}
private void sendPaymentCommand(String orderId) {
// 通过 Outbox 发送支付命令
outboxService.send("payment-commands", orderId,
JSON.toJSONString(Map.of("orderId", orderId, "command", "PROCESS_PAYMENT")));
}
}

5.4 Saga vs 其他方案#

维度本地消息表事务性发件箱Saga2PC/XA
一致性范围单服务单服务跨服务跨服务
一致性级别最终一致最终一致最终一致强一致
延迟秒级毫秒级秒级秒级
实现复杂度
补偿操作不需要不需要必须实现自动回滚
性能影响高(锁等待)
适用场景单服务 DB+MQ单服务低延迟跨服务编排强一致需求
Note

Saga 模式的核心代价是”补偿操作”——每个正向操作都必须有对应的补偿操作。如果业务操作不可逆(如银行转账已到账),Saga 就不适用,需要考虑 TCC 或人工对账。

六、RocketMQ 事务消息#

6.1 原理#

RocketMQ 的事务消息通过两阶段提交 + 回查机制,保证本地事务和消息发送的原子性:

sequenceDiagram participant P as Producer participant RMQ as RocketMQ Broker participant DB as 数据库 P->>RMQ: 1. 发送半消息(Half Message) RMQ-->>P: 半消息发送成功 P->>DB: 2. 执行本地事务 alt 本地事务成功 P->>RMQ: 3a. 提交消息(Commit) RMQ->>RMQ: 消息可见,投递给消费者 else 本地事务失败 P->>RMQ: 3b. 回滚消息(Rollback) RMQ->>RMQ: 删除消息,不投递 end Note over P,RMQ: 如果 3a/3b 未到达(网络故障) RMQ->>P: 4. 回查本地事务状态 P->>DB: 查询事务结果 P->>RMQ: 返回 Commit 或 Rollback

6.2 实现#

// 使用 RocketMQ 事务消息替代本地消息表
TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder(parseOrder(msg));
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Order order = orderService.getById(parseOrderId(msg));
return order != null ? COMMIT_MESSAGE : ROLLBACK_MESSAGE;
}
});

七、方案全景对比#

7.1 一致性方案选择决策树#

graph TD START["DB + MQ 一致性方案选择"] --> Q1{"涉及几个服务?"} Q1 -->|"单服务"| Q2{"延迟要求?"} Q1 -->|"多服务"| Q3{"需要强一致?"} Q2 -->|"秒级可接受"| LOCAL["本地消息表"] Q2 -->|"毫秒级"| Q4{"有 CDC 运维经验?"} Q4 -->|"是"| OUTBOX["事务性发件箱"] Q4 -->|"否"| Q5{"用 RocketMQ?"} Q5 -->|"是"| TXMSG["RocketMQ 事务消息"] Q5 -->|"否"| LOCAL2["本地消息表<br/>(优化轮询频率)"] Q3 -->|"是"| XA[" 2PC/XA<br/>(慎用)"] Q3 -->|"否"| SAGA["Saga 模式"] style LOCAL fill:#c8e6c9,stroke:#2e7d32 style OUTBOX fill:#c8e6c9,stroke:#2e7d32 style TXMSG fill:#c8e6c9,stroke:#2e7d32 style SAGA fill:#fff9c4,stroke:#f9a825 style XA fill:#ffcdd2,stroke:#c62828

7.2 全面对比表#

方案一致性延迟复杂度额外组件适用场景
本地消息表最终一致秒级单服务、简单场景
事务性发件箱最终一致毫秒级Debezium单服务、低延迟
RocketMQ 事务消息最终一致毫秒级RocketMQ单服务、已用 RocketMQ
Saga最终一致秒级Saga 编排器跨服务编排
2PC/XA强一致秒级事务协调器强一致刚需
Kafka 事务 APIExactly-Once毫秒级KafkaKafka 内部一致性
Tip

大多数场景下,本地消息表或事务性发件箱就足够了。RocketMQ 事务消息是另一个好选择,但需要使用 RocketMQ。Saga 适用于跨服务编排,但每个操作必须有补偿。2PC/XA 适用于强一致场景,但性能代价大。选择方案时,优先考虑”最终一致 + 简单实现”,而非”强一致 + 复杂实现”。

7.3 生产环境推荐组合#

团队规模推荐方案原因
小团队(< 10 人)本地消息表实现简单,无额外组件
中团队(10-50 人)事务性发件箱低延迟,CDC 可复用
大团队(50+ 人)事务性发件箱 + Saga单服务用发件箱,跨服务用 Saga
已用 RocketMQRocketMQ 事务消息无需额外组件
已用 Kafka事务性发件箱 + DebeziumKafka 生态原生集成

八、总结#

上一章深入探讨了消息队列选型决策。

维度关键要点
双写问题DB 和 MQ 无法原子操作,导致消息丢失或多余
本地消息表同一事务写入业务数据和消息表,定时轮询发送
事务性发件箱Outbox 表 + CDC 实时捕获,替代定时轮询
CDC 同步Debezium/Canal 捕获 Binlog,实时同步到下游
RocketMQ 事务消息两阶段提交 + 回查,保证 DB 和消息一致
Saga 模式跨服务编排,正向操作 + 补偿操作
方案选择优先本地消息表(简单),需要低延迟用发件箱+CDC,跨服务用 Saga

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

消息与数据库一致性
https://blog.souloss.com/posts/messaging/message-and-database-consistency/
作者
Souloss
发布于
2026-05-08
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时