数据库里存的是”账户余额 1300 元”——但这个数字是怎么来的?存了 1000、取了 200、又存了 500?还是存了 2000、取了 700?传统 CRUD 模式只保存当前状态,历史信息在覆盖的那一刻就丢失了。**事件溯源(Event Sourcing)**反其道而行:不存当前状态,存每一次状态变更的事件。余额 1300 不是一行记录,而是 AccountCreated(1000) → Deposited(500) → Withdrawn(200) 三条事件的计算结果。配合 CQRS(命令查询职责分离),写入走事件流、读取走物化视图,读写各自优化到极致。
一、Event Sourcing 原理
1.1 传统 CRUD vs Event Sourcing
| 维度 | CRUD(状态存储) | Event Sourcing(事件存储) |
|---|---|---|
| 存储内容 | 当前状态 | 状态变更的历史事件 |
| 更新方式 | 覆盖旧值 | 追加新事件 |
| 历史信息 | 丢失 | 完整保留 |
| 审计追踪 | 需要额外日志 | 天然具备 |
| 时间旅行 | 不支持 | 重放事件到任意时间点 |
1.2 Event Sourcing 核心概念
| 概念 | 说明 | 类比 |
|---|---|---|
| Event | 不可变的状态变更记录 | 银行流水 |
| Aggregate | 事件的聚合体,业务一致性边界 | 银行账户 |
| Event Store | 事件的持久化存储 | 银行流水账本 |
| Projection | 从事件派生的读取模型 | 银行对账单 |
// 事件定义public interface DomainEvent { String getAggregateId(); LocalDateTime getTimestamp();}
public record OrderCreatedEvent(String orderId, String customerId, BigDecimal amount, LocalDateTime timestamp) implements DomainEvent {}public record OrderPaidEvent(String orderId, BigDecimal paidAmount, LocalDateTime timestamp) implements DomainEvent {}public record OrderShippedEvent(String orderId, String trackingNumber, LocalDateTime timestamp) implements DomainEvent {}public record OrderCancelledEvent(String orderId, String reason, LocalDateTime timestamp) implements DomainEvent {}
// Aggregate:订单聚合根public class OrderAggregate { private String orderId; private String customerId; private BigDecimal amount; private OrderStatus status; private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// 从事件重建状态 public static OrderAggregate fromEvents(List<DomainEvent> events) { OrderAggregate order = new OrderAggregate(); events.forEach(order::apply); return order; }
// 业务操作:创建订单 public void create(String orderId, String customerId, BigDecimal amount) { if (status != null) throw new IllegalStateException("Order already exists"); var event = new OrderCreatedEvent(orderId, customerId, amount, LocalDateTime.now()); apply(event); uncommittedEvents.add(event); }
// 业务操作:支付 public void pay(BigDecimal paidAmount) { if (status != OrderStatus.CREATED) throw new IllegalStateException("Cannot pay: " + status); var event = new OrderPaidEvent(orderId, paidAmount, LocalDateTime.now()); apply(event); uncommittedEvents.add(event); }
// 应用事件(更新状态) private void apply(DomainEvent event) { switch (event) { case OrderCreatedEvent e -> { this.orderId = e.orderId(); this.customerId = e.customerId(); this.amount = e.amount(); this.status = OrderStatus.CREATED; } case OrderPaidEvent e -> this.status = OrderStatus.PAID; case OrderShippedEvent e -> this.status = OrderStatus.SHIPPED; case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED; } }}1.3 Event Store 实现
// Event Store 接口public interface EventStore { void append(String aggregateId, List<DomainEvent> events, long expectedVersion); List<DomainEvent> load(String aggregateId); List<DomainEvent> loadFromVersion(String aggregateId, long fromVersion);}
// 基于 Kafka 的 Event Storepublic class KafkaEventStore implements EventStore { private final KafkaTemplate<String, String> kafkaTemplate; private final ObjectMapper objectMapper;
@Override public void append(String aggregateId, List<DomainEvent> events, long expectedVersion) { // 乐观并发控制:检查版本号 List<DomainEvent> existing = load(aggregateId); if (existing.size() != expectedVersion) { throw new ConcurrentModificationException( "Expected version " + expectedVersion + " but was " + existing.size()); }
// 追加事件到 Kafka for (DomainEvent event : events) { ProducerRecord<String, String> record = new ProducerRecord<>( "events-" + aggregateId, // 按 Aggregate 分区 aggregateId, objectMapper.writeValueAsString(event) ); kafkaTemplate.send(record).get(); // 同步发送确保持久化 } }
@Override public List<DomainEvent> load(String aggregateId) { // 从 Kafka 读取所有事件 ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5)); return records.records(new TopicPartition("events", partitionOf(aggregateId))) .stream() .map(this::deserializeEvent) .toList(); }}Event Sourcing 与消息队列天然契合——事件本身就是消息,Event Store 可以直接使用 Kafka Topic。每个 Aggregate 对应一个分区键,保证同一 Aggregate 的事件有序。
二、CQRS 模式
2.1 CQRS 核心思想
CQRS(Command Query Responsibility Segregation)将读操作和写操作分离到不同的模型:
| 维度 | 传统架构 | CQRS |
|---|---|---|
| 数据模型 | 读写共享 | 读写分离 |
| 写模型 | 3NF 规范化 | Event Store |
| 读模型 | 3NF 规范化 | 反规范化视图 |
| 一致性 | 强一致 | 最终一致 |
| 扩展性 | 读写同时扩展 | 读写独立扩展 |
| 复杂度 | 低 | 高 |
2.2 Command 端实现
// Command 定义public record CreateOrderCommand(String orderId, String customerId, BigDecimal amount) {}public record PayOrderCommand(String orderId, BigDecimal amount) {}public record ShipOrderCommand(String orderId, String trackingNumber) {}public record CancelOrderCommand(String orderId, String reason) {}
// Command Handler@Servicepublic class OrderCommandHandler { private final EventStore eventStore;
@Transactional public void handle(CreateOrderCommand cmd) { OrderAggregate order = new OrderAggregate(); order.create(cmd.orderId(), cmd.customerId(), cmd.amount()); eventStore.append(cmd.orderId(), order.getUncommittedEvents(), 0); order.markEventsAsCommitted(); }
@Transactional public void handle(PayOrderCommand cmd) { List<DomainEvent> events = eventStore.load(cmd.orderId()); OrderAggregate order = OrderAggregate.fromEvents(events); order.pay(cmd.amount()); eventStore.append(cmd.orderId(), order.getUncommittedEvents(), events.size()); order.markEventsAsCommitted(); }}2.3 Query 端实现
// 事件投影:将事件转换为读模型@Componentpublic class OrderProjection {
@KafkaListener(topics = "order-events") public void handleEvent(ConsumerRecord<String, String> record) { DomainEvent event = deserializeEvent(record.value());
switch (event) { case OrderCreatedEvent e -> { OrderReadModel readModel = new OrderReadModel(); readModel.setOrderId(e.orderId()); readModel.setCustomerId(e.customerId()); readModel.setAmount(e.amount()); readModel.setStatus("CREATED"); readModel.setCreatedAt(e.timestamp()); orderReadRepository.save(readModel); } case OrderPaidEvent e -> { OrderReadModel readModel = orderReadRepository.findById(e.orderId()).orElseThrow(); readModel.setStatus("PAID"); readModel.setPaidAt(e.timestamp()); orderReadRepository.save(readModel); } case OrderShippedEvent e -> { OrderReadModel readModel = orderReadRepository.findById(e.orderId()).orElseThrow(); readModel.setStatus("SHIPPED"); readModel.setTrackingNumber(e.trackingNumber()); orderReadRepository.save(readModel); } case OrderCancelledEvent e -> { OrderReadModel readModel = orderReadRepository.findById(e.orderId()).orElseThrow(); readModel.setStatus("CANCELLED"); readModel.setCancelReason(e.reason()); orderReadRepository.save(readModel); } } }}
// Query Service:直接查询读模型@Servicepublic class OrderQueryService { public OrderReadModel getOrder(String orderId) { return orderReadRepository.findById(orderId).orElseThrow(); }
public List<OrderReadModel> getOrdersByCustomer(String customerId) { return orderReadRepository.findByCustomerId(customerId); }
public Page<OrderReadModel> searchOrders(OrderSearchCriteria criteria) { return orderReadRepository.search(criteria); }}2.4 多种读模型
CQRS 的强大之处在于可以为不同的查询需求创建不同的读模型:
| 读模型 | 存储类型 | 适用查询 | 示例 |
|---|---|---|---|
| 关系型视图 | MySQL/PostgreSQL | 复杂条件查询 | 订单列表、客户详情 |
| 搜索索引 | Elasticsearch | 全文搜索 | 订单搜索 |
| 时序数据 | TimescaleDB | 趋势分析 | 订单量趋势 |
| 图数据 | Neo4j | 关系查询 | 客户关联分析 |
| 缓存 | Redis | 高频读取 | 热点订单 |
三、快照机制
3.1 为什么需要快照?
当 Aggregate 的事件数量很大时,从所有事件重建状态会很慢:
| 事件数量 | 重建时间 | 问题 |
|---|---|---|
| 100 | < 1ms | 无 |
| 10,000 | ~10ms | 可接受 |
| 1,000,000 | ~1s | 不可接受 |
| 100,000,000 | ~100s | 严重问题 |
快照机制定期保存 Aggregate 的当前状态,重建时从最近的快照开始:
// 快照实现public class SnapshotStore { private static final int SNAPSHOT_INTERVAL = 1000; // 每 1000 个事件一个快照
public void maybeSaveSnapshot(String aggregateId, List<DomainEvent> allEvents) { if (allEvents.size() % SNAPSHOT_INTERVAL == 0) { OrderAggregate aggregate = OrderAggregate.fromEvents(allEvents); Snapshot snapshot = new Snapshot( aggregateId, allEvents.size(), aggregate.getState(), LocalDateTime.now() ); snapshotRepository.save(snapshot); } }
public OrderAggregate loadWithSnapshot(String aggregateId) { // 1. 加载最近的快照 Snapshot snapshot = snapshotRepository.findLatest(aggregateId);
// 2. 加载快照之后的事件 List<DomainEvent> events; if (snapshot != null) { events = eventStore.loadFromVersion(aggregateId, snapshot.getVersion()); OrderAggregate aggregate = OrderAggregate.fromSnapshot(snapshot); events.forEach(aggregate::apply); return aggregate; } else { events = eventStore.load(aggregateId); return OrderAggregate.fromEvents(events); } }}3.2 快照策略
| 策略 | 触发条件 | 优点 | 缺点 |
|---|---|---|---|
| 定期快照 | 每 N 个事件 | 简单 | 可能保存不必要的快照 |
| 基于大小 | 事件总大小超阈值 | 控制存储 | 需要跟踪大小 |
| 基于时间 | 每小时/每天 | 可预测 | 可能错过高频变更 |
| 按需快照 | 加载时触发 | 无额外写入 | 首次加载仍慢 |
快照引入了新的复杂度:1)快照与事件的一致性——快照保存失败会导致状态不一致;2)快照的版本兼容——Aggregate 结构变更后旧快照可能无法使用;3)快照的存储成本——每个快照都是完整状态的副本。
四、Event Sourcing + CQRS 完整架构
4.1 端到端流程
4.2 最终一致性
CQRS 的读模型是最终一致的——写入后立即读取可能看不到最新状态:
| 一致性级别 | 延迟 | 实现方式 |
|---|---|---|
| 强一致 | 0 | 读写同一存储(牺牲 CQRS 优势) |
| 近实时 | < 100ms | 同步投影 |
| 最终一致 | 秒级 | 异步投影(推荐) |
// 处理最终一致性的策略@RestController@RequestMapping("/api/orders")public class OrderController {
@PostMapping public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest req) { // 写入 commandHandler.handle(new CreateOrderCommand(req.getOrderId(), req.getCustomerId(), req.getAmount()));
// 策略一:返回 202 Accepted,客户端轮询 return ResponseEntity.accepted() .body(new OrderResponse(req.getOrderId(), "PENDING", "Order is being processed"));
// 策略二:等待读模型更新(带超时) // awaitReadModelUpdate(req.getOrderId(), Duration.ofSeconds(5)); }}五、Event Sourcing vs CRUD 对比
| 维度 | CRUD | Event Sourcing |
|---|---|---|
| 数据模型 | 当前状态 | 事件序列 |
| 更新方式 | 覆盖 | 追加 |
| 历史信息 | 丢失 | 完整保留 |
| 审计追踪 | 额外实现 | 天然支持 |
| 时间旅行 | 不支持 | 重放事件 |
| 复杂度 | 低 | 高 |
| 性能(写) | 中 | 高(追加) |
| 性能(读) | 高 | 需要投影 |
| 一致性 | 强一致 | 最终一致 |
| 适用场景 | 简单 CRUD | 复杂业务逻辑、审计需求 |
Event Sourcing 不是银弹——它带来了审计追踪和时间旅行的能力,但也引入了最终一致性、事件版本兼容和复杂度等挑战。选择 Event Sourcing 的判断标准:1)业务是否需要完整的变更历史?2)业务逻辑是否复杂到需要从事件重建状态?3)团队是否有能力管理最终一致性?如果答案都是”是”,Event Sourcing 值得考虑。
5.1 事件版本兼容
当业务演进时,事件的结构可能需要变更,这带来了版本兼容问题:
| 策略 | 说明 | 优点 | 缺点 |
|---|---|---|---|
| 向上兼容 | 新版本能读旧数据 | 简单 | 限制变更范围 |
| 多版本处理 | 每个版本一个处理器 | 灵活 | 代码膨胀 |
| 事件升级 | 读取时转换旧版本 | 透明 | 需要升级函数 |
| 不可变事件 | 永不修改旧事件,只发新类型 | 最安全 | 事件类型膨胀 |
// 事件升级器:将旧版本事件转换为新版本public class EventUpgrader { public DomainEvent upgrade(DomainEvent event) { return switch (event) { case OrderCreatedEventV1 v1 -> upgradeV1ToV2(v1); case OrderCreatedEventV2 v2 -> v2; // 最新版本,无需升级 default -> event; }; }
private OrderCreatedEventV2 upgradeV1ToV2(OrderCreatedEventV1 v1) { // V1 没有 currency 字段,V2 新增了 currency return new OrderCreatedEventV2( v1.orderId(), v1.customerId(), v1.amount(), "CNY", // 默认值 v1.timestamp() ); }}5.2 事件存储查询实践
-- 事件存储表的典型 SchemaCREATE TABLE event_store ( event_id UUID PRIMARY KEY, aggregate_id UUID NOT NULL, event_type VARCHAR(100) NOT NULL, event_data JSONB NOT NULL, version BIGINT NOT NULL, timestamp TIMESTAMPTZ DEFAULT NOW(), UNIQUE (aggregate_id, version) -- 乐观锁:同一聚合版本唯一);
-- 查询某个聚合的完整事件历史SELECT event_type, event_data, version, timestampFROM event_storeWHERE aggregate_id = 'order-12345'ORDER BY version ASC;
-- 查询某类事件(投影构建)SELECT event_data->>'product_id' AS product_id, COUNT(*) AS order_count, SUM((event_data->>'amount')::numeric) AS total_amountFROM event_storeWHERE event_type = 'OrderPlaced' AND timestamp >= NOW() - INTERVAL '7 days'GROUP BY event_data->>'product_id';六、总结
上一章建立了消息积压与反压机制的认知框架。
| 维度 | 关键要点 |
|---|---|
| Event Sourcing | 以事件序列替代状态存储,保留完整历史 |
| CQRS | 读写分离,写模型用 Event Store,读模型用反规范化视图 |
| 快照 | 定期保存 Aggregate 状态,加速重建 |
| 投影 | 从事件派生多种读模型,适配不同查询需求 |
| 最终一致性 | CQRS 的代价,需要客户端感知和处理 |
| 适用场景 | 复杂业务逻辑、审计需求、事件驱动系统 |
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






