用户付了钱,订单状态更新了,但支付通知没发出去——下游支付服务不知道这笔交易,对账时发现差额。这是双写问题的经典表现:数据库写入和消息发送是两个独立操作,无法原子完成。先写数据库再发消息,消息可能丢失;先发消息再写数据库,数据库失败则消息多余;在数据库事务里发消息,事务回滚但消息已经发出。三种顺序,三种不一致。解决双写问题的方案——本地消息表、事务性发件箱、CDC——本质上都是在用不同的方式制造”伪原子性”。
一、双写问题
1.1 什么是双写?
双写是指一个业务操作需要同时写入数据库和发送消息:
| 步骤 | 操作 | 可能失败 |
|---|---|---|
| 步骤 1 | 写入数据库 | 数据库宕机、超时 |
| 步骤 2 | 发送消息 | 消息系统宕机、网络断开 |
1.2 双写的三种不一致场景
| 场景 | 步骤 1 | 步骤 2 | 后果 |
|---|---|---|---|
| 消息丢失 | DB 写入成功 | 消息发送失败 | 数据库有数据,下游不知道 |
| 消息多余 | DB 写入失败 | 消息发送成功 | 下游收到不存在的数据 |
| 顺序错误 | DB 写入慢 | 消息先到 | 下游先收到消息,但 DB 还没更新 |
双写问题的本质是:数据库事务和消息发送是两个独立的操作,无法原子性完成。数据库事务保证 ACID,消息发送保证至少一次投递,但两者之间没有事务协调器。
1.3 常见的错误方案
| 方案 | 代码 | 问题 |
|---|---|---|
| 先 DB 后消息 | db.save(); mq.send(); | 消息发送失败 → 丢失 |
| 先消息后 DB | mq.send(); db.save(); | DB 失败 → 消息多余 |
| DB 事务中发消息 | @Transactional { db.save(); mq.send(); } | 消息发送成功但事务回滚 → 消息多余 |
| 异步发送 | db.save(); async.send(); | 应用崩溃 → 消息丢失 |
1.4 双写问题的形式化分析
更严谨地分析双写失败的所有可能。假设操作 A(写 DB)和操作 B(发消息)顺序执行:
关键观察:A 成功后 B 失败 是唯一的不一致场景。所有解决方案的核心思路都是:要么让 B 不可能失败(本地消息表),要么让 B 可以安全重试(幂等 + 重试),要么消除 B 作为独立步骤的需要(CDC)。
二、本地消息表
2.1 原理
本地消息表将消息持久化到业务数据库中,利用数据库事务保证业务操作和消息记录的原子性:
2.2 消息表设计
-- 本地消息表CREATE TABLE outbox_messages ( id BIGINT AUTO_INCREMENT PRIMARY KEY, topic VARCHAR(255) NOT NULL COMMENT '消息 Topic', message_key VARCHAR(255) NOT NULL COMMENT '消息 Key(用于分区路由和幂等)', message_value TEXT NOT NULL COMMENT '消息内容(JSON)', status ENUM('PENDING', 'SENT', 'FAILED') DEFAULT 'PENDING' COMMENT '发送状态', retry_count INT DEFAULT 0 COMMENT '重试次数', max_retries INT DEFAULT 5 COMMENT '最大重试次数', next_retry_at TIMESTAMP NULL COMMENT '下次重试时间(指数退避)', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
INDEX idx_status_next_retry (status, next_retry_at), INDEX idx_created (created_at)) ENGINE=InnoDB COMMENT='本地消息表';2.3 完整实现(Java)
@Servicepublic class OrderService { private final OrderRepository orderRepository; private final OutboxMessageRepository outboxRepository; private final KafkaTemplate<String, String> kafkaTemplate;
@Transactional public Order createOrder(CreateOrderRequest request) { // 1. 写入业务数据 Order order = new Order(); order.setId(UUID.randomUUID().toString()); order.setCustomerId(request.getCustomerId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.CREATED); order.setItems(request.getItems()); orderRepository.save(order);
// 2. 写入消息表(同一事务,保证原子性) OutboxMessage message = new OutboxMessage(); message.setTopic("order-events"); message.setMessageKey(order.getId()); // orderId 作为 Key,保证分区有序 message.setMessageValue(JSON.toJSONString(order)); message.setStatus(MessageStatus.PENDING); message.setRetryCount(0); message.setMaxRetries(5); message.setNextRetryAt(LocalDateTime.now()); // 立即可发送 outboxRepository.save(message);
return order; }
// 定时任务:扫描待发送消息 @Scheduled(fixedRate = 5000) public void sendPendingMessages() { List<OutboxMessage> pending = outboxRepository .findByStatusAndNextRetryAtBefore(MessageStatus.PENDING, LocalDateTime.now());
for (OutboxMessage msg : pending) { try { // 同步发送,确保确认后再更新状态 kafkaTemplate.send(msg.getTopic(), msg.getMessageKey(), msg.getMessageValue()) .get(10, TimeUnit.SECONDS); msg.setStatus(MessageStatus.SENT); } catch (Exception e) { msg.setRetryCount(msg.getRetryCount() + 1); if (msg.getRetryCount() >= msg.getMaxRetries()) { msg.setStatus(MessageStatus.FAILED); // 告警:消息发送最终失败 alertService.alert("消息发送失败: " + msg.getId()); } else { // 指数退避:1s, 2s, 4s, 8s, 16s long delaySeconds = (long) Math.pow(2, msg.getRetryCount()); msg.setNextRetryAt(LocalDateTime.now().plusSeconds(delaySeconds)); } } outboxRepository.save(msg); } }
// 定期清理已发送消息(保留 7 天) @Scheduled(cron = "0 0 3 * * ?") public void cleanSentMessages() { outboxRepository.deleteByStatusAndUpdatedAtBefore( MessageStatus.SENT, LocalDateTime.now().minusDays(7) ); }}2.4 Python 实现
import jsonimport uuidfrom datetime import datetime, timedeltafrom sqlalchemy import create_engine, Column, String, Integer, Enum, DateTime, Textfrom sqlalchemy.orm import sessionmaker, declarative_basefrom kafka import KafkaProducer
Base = declarative_base()
class OutboxMessage(Base): __tablename__ = 'outbox_messages' id = Column(Integer, primary_key=True, autoincrement=True) topic = Column(String(255), nullable=False) message_key = Column(String(255), nullable=False) message_value = Column(Text, nullable=False) status = Column(Enum('PENDING', 'SENT', 'FAILED'), default='PENDING') retry_count = Column(Integer, default=0) next_retry_at = Column(DateTime, default=datetime.now)
class OrderService: def __init__(self, db_url: str, kafka_servers: list): self.engine = create_engine(db_url) self.Session = sessionmaker(bind=self.engine) self.producer = KafkaProducer( bootstrap_servers=kafka_servers, value_serializer=lambda v: json.dumps(v).encode('utf-8'), key_serializer=lambda k: k.encode('utf-8'), acks='all', retries=3 )
def create_order(self, customer_id: str, amount: float, items: list): session = self.Session() try: order_id = str(uuid.uuid4()) # 1. 写入业务数据 session.execute( "INSERT INTO orders (id, customer_id, amount, status) " "VALUES (:id, :cid, :amt, 'CREATED')", {"id": order_id, "cid": customer_id, "amt": amount} ) # 2. 写入消息表(同一事务) session.execute( "INSERT INTO outbox_messages (topic, message_key, message_value, status) " "VALUES ('order-events', :key, :value, 'PENDING')", { "key": order_id, "value": json.dumps({ "orderId": order_id, "customerId": customer_id, "amount": amount, "items": items, "eventType": "ORDER_CREATED" }) } ) session.commit() return order_id except Exception: session.rollback() raise
def send_pending_messages(self): session = self.Session() messages = session.execute( "SELECT * FROM outbox_messages " "WHERE status = 'PENDING' AND next_retry_at <= :now " "LIMIT 100", {"now": datetime.now()} ).fetchall()
for msg in messages: try: future = self.producer.send( msg.topic, key=msg.message_key, value=json.loads(msg.message_value) ) future.get(timeout=10) # 同步等待确认 session.execute( "UPDATE outbox_messages SET status = 'SENT' WHERE id = :id", {"id": msg.id} ) except Exception: new_retry = msg.retry_count + 1 if new_retry >= 5: session.execute( "UPDATE outbox_messages SET status = 'FAILED' WHERE id = :id", {"id": msg.id} ) else: delay = 2 ** new_retry session.execute( "UPDATE outbox_messages SET retry_count = :rc, " "next_retry_at = :nrt WHERE id = :id", {"rc": new_retry, "nrt": datetime.now() + timedelta(seconds=delay), "id": msg.id} ) session.commit()2.5 优缺点
| 优点 | 缺点 |
|---|---|
| 实现简单 | 消息表与业务表耦合 |
| 利用数据库事务保证原子性 | 定时轮询有延迟 |
| 消息不丢(持久化到 DB) | 需要定期清理已发送消息 |
| 支持重试 | 高并发时消息表成为瓶颈 |
| 不依赖额外组件 | 同一数据库增加存储压力 |
三、事务性发件箱(Transactional Outbox)
3.1 原理
事务性发件箱是本地消息表的改进版,通过 CDC(Change Data Capture)替代定时轮询:
| 组件 | 说明 | 替代方案 |
|---|---|---|
| Outbox 表 | 存储待发送消息 | 业务事件表 |
| CDC 工具 | 捕获数据库变更 | Debezium、Canal、Maxwell |
| 消息队列 | 传递消息 | Kafka、RocketMQ |
3.2 Debezium 实现
// Debezium Connector 配置{ "name": "order-outbox-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "password", "database.server.id": "184054", "database.server.name": "order_service", "database.include.list": "order_db", "table.include.list": "order_db.outbox_messages", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.order", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.route.topic.replacement": "${routedByValue}", "transforms.outbox.table.field.event.id": "id", "transforms.outbox.table.field.event.key": "message_key", "transforms.outbox.table.field.event.type": "event_type", "transforms.outbox.table.field.event.payload": "message_value" }}// 应用端:只需写入 Outbox 表@Servicepublic class OrderService { @Transactional public void createOrder(Order order) { // 1. 写入业务数据 orderRepository.save(order);
// 2. 写入 Outbox 表(同一事务) OutboxEvent event = new OutboxEvent(); event.setAggregateType("Order"); event.setAggregateId(order.getId()); event.setEventType("ORDER_CREATED"); event.setPayload(JSON.toJSONString(order)); outboxRepository.save(event);
// Debezium 会自动捕获 Outbox 表的变更并发送到 Kafka // 无需手动发送消息! }}3.3 Outbox 表设计
-- 标准 Outbox 表(Debezium 推荐格式)CREATE TABLE outbox_events ( id VARCHAR(255) PRIMARY KEY, -- 事件 ID(UUID) aggregatetype VARCHAR(255) NOT NULL, -- 聚合类型(如 Order) aggregateid VARCHAR(255) NOT NULL, -- 聚合 ID(如订单 ID) eventtype VARCHAR(255) NOT NULL, -- 事件类型(如 ORDER_CREATED) payload JSON NOT NULL, -- 事件负载 created_at TIMESTAMP DEFAULT NOW(), -- 创建时间
INDEX idx_aggregate (aggregatetype, aggregateid), INDEX idx_created (created_at));
-- 清理策略:定期删除已处理的事件-- Debezium 读取后可以安全删除(CDC 已捕获)DELETE FROM outbox_events WHERE created_at < DATE_SUB(NOW(), INTERVAL 7 DAY);3.4 事务性发件箱的完整流程
3.5 本地消息表 vs 事务性发件箱
| 维度 | 本地消息表 | 事务性发件箱 |
|---|---|---|
| 消息发送 | 定时轮询 | CDC 实时捕获 |
| 延迟 | 秒级 | 毫秒级 |
| 额外组件 | 无 | Debezium/Canal |
| 实现复杂度 | 低 | 中 |
| 可靠性 | 高 | 高 |
| 吞吐量 | 中(轮询开销) | 高(流式处理) |
| 运维复杂度 | 低 | 中(需维护 CDC) |
| 消息清理 | 需手动清理 | CDC 捕获后可安全删除 |
事务性发件箱引入了 CDC 组件,增加了运维复杂度。如果团队没有 CDC 的运维经验,建议先从本地消息表开始,等业务规模增长后再引入 CDC。
四、CDC 同步
4.1 CDC 原理
CDC(Change Data Capture)通过读取数据库的 Binlog/Redo Log 捕获数据变更:
| CDC 工具 | 支持数据库 | 特点 |
|---|---|---|
| Debezium | MySQL/PG/Oracle/SQL Server | 开源、Kafka Connect 集成 |
| Canal | MySQL | 阿里开源、中文社区 |
| Maxwell | MySQL | 轻量、JSON 输出 |
| AWS DMS | 多种 | 云服务、托管 |
4.2 Debezium 配置示例
{ "name": "order-cdc-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "password", "database.server.id": "184054", "database.server.name": "order_service", "database.include.list": "order_db", "table.include.list": "order_db.orders,order_db.order_items", "snapshot.mode": "schema_only", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "schema-changes.order" }}# 注册 Debezium Connectorcurl -X POST -H "Content-Type: application/json" \ --data @debezium-order.json \ http://localhost:8083/connectors
# 查看 Connector 状态curl http://localhost:8083/connectors/order-cdc-connector/status4.3 CDC 消费端
// 消费 Debezium 变更事件@KafkaListener(topics = "order_service.order_db.orders")public void handleOrderChange(ConsumerRecord<String, String> record) { JSONObject event = JSON.parseObject(record.value()); JSONObject payload = event.getJSONObject("payload");
String operation = payload.getString("op"); JSONObject before = payload.getJSONObject("before"); JSONObject after = payload.getJSONObject("after");
switch (operation) { case "c" -> { // Create syncToElasticsearch(after); syncToRedis(after); } case "u" -> { // Update syncToElasticsearch(after); invalidateRedisCache(after); } case "d" -> { // Delete deleteFromElasticsearch(before.getString("id")); deleteFromRedis(before.getString("id")); } }}4.4 CDC 的注意事项
| 注意事项 | 说明 | 解决方案 |
|---|---|---|
| Binlog 格式 | 必须使用 ROW 格式 | binlog_format=ROW |
| Binlog 保留 | CDC 延迟时 Binlog 可能被清理 | 增大 expire_logs_days |
| Schema 变更 | 表结构变更可能导致 CDC 中断 | 使用 Schema History Topic |
| 全量同步 | 首次启动需要全量快照 | snapshot.mode=schema_only 跳过 |
| 消费者幂等 | CDC 事件可能重复消费 | 使用主键做幂等 |
五、Saga 模式
5.1 为什么需要 Saga?
当业务操作涉及多个服务,每个服务都有自己的数据库时,本地消息表只能保证”单个服务”的 DB+MQ 一致性。跨服务的一致性需要 Saga 模式——通过编排一系列本地事务,每个本地事务发送事件触发下一个步骤,失败时执行补偿操作。
5.2 Saga 编排模式
5.3 Saga 实现(编排式)
// Saga 编排器:定义事务步骤和补偿操作@Servicepublic class OrderSagaOrchestrator { private final KafkaTemplate<String, String> kafkaTemplate;
public void executeOrderSaga(Order order) { // 步骤 1:创建订单(已在 OrderService 中完成,Outbox 已发送) // 等待库存服务消费 ORDER_CREATED 事件
// Saga 状态机 sagaStateRepository.save(new SagaState(order.getId(), "INVENTORY_DEDUCTING")); }
// 库存扣减成功 @KafkaListener(topics = "inventory-events", groupId = "saga-orchestrator") public void onInventoryEvent(ConsumerRecord<String, String> record) { InventoryEvent event = JSON.parseObject(record.value(), InventoryEvent.class); SagaState saga = sagaStateRepository.findByOrderId(event.getOrderId());
switch (event.getEventType()) { case "INVENTORY_DEDUCTED" -> { // 库存扣减成功 → 触发支付 saga.setState("PAYMENT_PROCESSING"); sendPaymentCommand(event.getOrderId()); } case "INVENTORY_DEDUCT_FAILED" -> { // 库存扣减失败 → 取消订单 saga.setState("COMPENSATING"); sendCancelOrderCommand(event.getOrderId()); } } sagaStateRepository.save(saga); }
// 支付结果 @KafkaListener(topics = "payment-events", groupId = "saga-orchestrator") public void onPaymentEvent(ConsumerRecord<String, String> record) { PaymentEvent event = JSON.parseObject(record.value(), PaymentEvent.class); SagaState saga = sagaStateRepository.findByOrderId(event.getOrderId());
switch (event.getEventType()) { case "PAYMENT_COMPLETED" -> { // 支付成功 → 确认订单 saga.setState("COMPLETED"); sendConfirmOrderCommand(event.getOrderId()); } case "PAYMENT_FAILED" -> { // 支付失败 → 补偿:恢复库存 + 取消订单 saga.setState("COMPENSATING"); sendRestoreInventoryCommand(event.getOrderId()); sendCancelOrderCommand(event.getOrderId()); } } sagaStateRepository.save(saga); }
private void sendPaymentCommand(String orderId) { // 通过 Outbox 发送支付命令 outboxService.send("payment-commands", orderId, JSON.toJSONString(Map.of("orderId", orderId, "command", "PROCESS_PAYMENT"))); }}5.4 Saga vs 其他方案
| 维度 | 本地消息表 | 事务性发件箱 | Saga | 2PC/XA |
|---|---|---|---|---|
| 一致性范围 | 单服务 | 单服务 | 跨服务 | 跨服务 |
| 一致性级别 | 最终一致 | 最终一致 | 最终一致 | 强一致 |
| 延迟 | 秒级 | 毫秒级 | 秒级 | 秒级 |
| 实现复杂度 | 低 | 中 | 高 | 高 |
| 补偿操作 | 不需要 | 不需要 | 必须实现 | 自动回滚 |
| 性能影响 | 低 | 低 | 中 | 高(锁等待) |
| 适用场景 | 单服务 DB+MQ | 单服务低延迟 | 跨服务编排 | 强一致需求 |
Saga 模式的核心代价是”补偿操作”——每个正向操作都必须有对应的补偿操作。如果业务操作不可逆(如银行转账已到账),Saga 就不适用,需要考虑 TCC 或人工对账。
六、RocketMQ 事务消息
6.1 原理
RocketMQ 的事务消息通过两阶段提交 + 回查机制,保证本地事务和消息发送的原子性:
6.2 实现
// 使用 RocketMQ 事务消息替代本地消息表TransactionMQProducer producer = new TransactionMQProducer("order-producer-group");producer.setTransactionListener(new TransactionListener() { @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { orderService.createOrder(parseOrder(msg)); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { Order order = orderService.getById(parseOrderId(msg)); return order != null ? COMMIT_MESSAGE : ROLLBACK_MESSAGE; }});七、方案全景对比
7.1 一致性方案选择决策树
7.2 全面对比表
| 方案 | 一致性 | 延迟 | 复杂度 | 额外组件 | 适用场景 |
|---|---|---|---|---|---|
| 本地消息表 | 最终一致 | 秒级 | 低 | 无 | 单服务、简单场景 |
| 事务性发件箱 | 最终一致 | 毫秒级 | 中 | Debezium | 单服务、低延迟 |
| RocketMQ 事务消息 | 最终一致 | 毫秒级 | 中 | RocketMQ | 单服务、已用 RocketMQ |
| Saga | 最终一致 | 秒级 | 高 | Saga 编排器 | 跨服务编排 |
| 2PC/XA | 强一致 | 秒级 | 高 | 事务协调器 | 强一致刚需 |
| Kafka 事务 API | Exactly-Once | 毫秒级 | 中 | Kafka | Kafka 内部一致性 |
大多数场景下,本地消息表或事务性发件箱就足够了。RocketMQ 事务消息是另一个好选择,但需要使用 RocketMQ。Saga 适用于跨服务编排,但每个操作必须有补偿。2PC/XA 适用于强一致场景,但性能代价大。选择方案时,优先考虑”最终一致 + 简单实现”,而非”强一致 + 复杂实现”。
7.3 生产环境推荐组合
| 团队规模 | 推荐方案 | 原因 |
|---|---|---|
| 小团队(< 10 人) | 本地消息表 | 实现简单,无额外组件 |
| 中团队(10-50 人) | 事务性发件箱 | 低延迟,CDC 可复用 |
| 大团队(50+ 人) | 事务性发件箱 + Saga | 单服务用发件箱,跨服务用 Saga |
| 已用 RocketMQ | RocketMQ 事务消息 | 无需额外组件 |
| 已用 Kafka | 事务性发件箱 + Debezium | Kafka 生态原生集成 |
八、总结
上一章深入探讨了消息队列选型决策。
| 维度 | 关键要点 |
|---|---|
| 双写问题 | DB 和 MQ 无法原子操作,导致消息丢失或多余 |
| 本地消息表 | 同一事务写入业务数据和消息表,定时轮询发送 |
| 事务性发件箱 | Outbox 表 + CDC 实时捕获,替代定时轮询 |
| CDC 同步 | Debezium/Canal 捕获 Binlog,实时同步到下游 |
| RocketMQ 事务消息 | 两阶段提交 + 回查,保证 DB 和消息一致 |
| Saga 模式 | 跨服务编排,正向操作 + 补偿操作 |
| 方案选择 | 优先本地消息表(简单),需要低延迟用发件箱+CDC,跨服务用 Saga |
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






