mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1580 字
4 分钟
事件溯源与 CQRS
2026-04-30

数据库里存的是”账户余额 1300 元”——但这个数字是怎么来的?存了 1000、取了 200、又存了 500?还是存了 2000、取了 700?传统 CRUD 模式只保存当前状态,历史信息在覆盖的那一刻就丢失了。**事件溯源(Event Sourcing)**反其道而行:不存当前状态,存每一次状态变更的事件。余额 1300 不是一行记录,而是 AccountCreated(1000)Deposited(500)Withdrawn(200) 三条事件的计算结果。配合 CQRS(命令查询职责分离),写入走事件流、读取走物化视图,读写各自优化到极致。

一、Event Sourcing 原理#

1.1 传统 CRUD vs Event Sourcing#

维度CRUD(状态存储)Event Sourcing(事件存储)
存储内容当前状态状态变更的历史事件
更新方式覆盖旧值追加新事件
历史信息丢失完整保留
审计追踪需要额外日志天然具备
时间旅行不支持重放事件到任意时间点
graph TB subgraph "CRUD:覆盖状态" CRUD1["余额: 1000"] -->|"存入 500"| CRUD2["余额: 1500"] CRUD2 -->|"取出 200"| CRUD3["余额: 1300"] Note1["历史信息丢失"] end subgraph "Event Sourcing:追加事件" ES1["Event: 账户创建"] --> ES2["Event: 存入 500"] ES2 --> ES3["Event: 取出 200"] Note2["通过重放事件计算当前状态<br/>1000 + 500 - 200 = 1300"] end

1.2 Event Sourcing 核心概念#

概念说明类比
Event不可变的状态变更记录银行流水
Aggregate事件的聚合体,业务一致性边界银行账户
Event Store事件的持久化存储银行流水账本
Projection从事件派生的读取模型银行对账单
// 事件定义
public interface DomainEvent {
String getAggregateId();
LocalDateTime getTimestamp();
}
public record OrderCreatedEvent(String orderId, String customerId, BigDecimal amount, LocalDateTime timestamp) implements DomainEvent {}
public record OrderPaidEvent(String orderId, BigDecimal paidAmount, LocalDateTime timestamp) implements DomainEvent {}
public record OrderShippedEvent(String orderId, String trackingNumber, LocalDateTime timestamp) implements DomainEvent {}
public record OrderCancelledEvent(String orderId, String reason, LocalDateTime timestamp) implements DomainEvent {}
// Aggregate:订单聚合根
public class OrderAggregate {
private String orderId;
private String customerId;
private BigDecimal amount;
private OrderStatus status;
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// 从事件重建状态
public static OrderAggregate fromEvents(List<DomainEvent> events) {
OrderAggregate order = new OrderAggregate();
events.forEach(order::apply);
return order;
}
// 业务操作:创建订单
public void create(String orderId, String customerId, BigDecimal amount) {
if (status != null) throw new IllegalStateException("Order already exists");
var event = new OrderCreatedEvent(orderId, customerId, amount, LocalDateTime.now());
apply(event);
uncommittedEvents.add(event);
}
// 业务操作:支付
public void pay(BigDecimal paidAmount) {
if (status != OrderStatus.CREATED) throw new IllegalStateException("Cannot pay: " + status);
var event = new OrderPaidEvent(orderId, paidAmount, LocalDateTime.now());
apply(event);
uncommittedEvents.add(event);
}
// 应用事件(更新状态)
private void apply(DomainEvent event) {
switch (event) {
case OrderCreatedEvent e -> {
this.orderId = e.orderId();
this.customerId = e.customerId();
this.amount = e.amount();
this.status = OrderStatus.CREATED;
}
case OrderPaidEvent e -> this.status = OrderStatus.PAID;
case OrderShippedEvent e -> this.status = OrderStatus.SHIPPED;
case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED;
}
}
}

1.3 Event Store 实现#

// Event Store 接口
public interface EventStore {
void append(String aggregateId, List<DomainEvent> events, long expectedVersion);
List<DomainEvent> load(String aggregateId);
List<DomainEvent> loadFromVersion(String aggregateId, long fromVersion);
}
// 基于 Kafka 的 Event Store
public class KafkaEventStore implements EventStore {
private final KafkaTemplate<String, String> kafkaTemplate;
private final ObjectMapper objectMapper;
@Override
public void append(String aggregateId, List<DomainEvent> events, long expectedVersion) {
// 乐观并发控制:检查版本号
List<DomainEvent> existing = load(aggregateId);
if (existing.size() != expectedVersion) {
throw new ConcurrentModificationException(
"Expected version " + expectedVersion + " but was " + existing.size());
}
// 追加事件到 Kafka
for (DomainEvent event : events) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"events-" + aggregateId, // 按 Aggregate 分区
aggregateId,
objectMapper.writeValueAsString(event)
);
kafkaTemplate.send(record).get(); // 同步发送确保持久化
}
}
@Override
public List<DomainEvent> load(String aggregateId) {
// 从 Kafka 读取所有事件
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
return records.records(new TopicPartition("events", partitionOf(aggregateId)))
.stream()
.map(this::deserializeEvent)
.toList();
}
}
Note

Event Sourcing 与消息队列天然契合——事件本身就是消息,Event Store 可以直接使用 Kafka Topic。每个 Aggregate 对应一个分区键,保证同一 Aggregate 的事件有序。

二、CQRS 模式#

2.1 CQRS 核心思想#

CQRS(Command Query Responsibility Segregation)将读操作和写操作分离到不同的模型:

graph TB subgraph "传统架构:读写共享模型" CLIENT1["Client"] --> API1["API"] API1 --> DB1["Database<br/>(读写共享)"] end subgraph "CQRS:读写分离模型" CLIENT2["Client"] --> CMD["Command<br/>(写模型)"] CLIENT2 --> QRY["Query<br/>(读模型)"] CMD --> ES["Event Store<br/>(写存储)"] ES -->|"事件投影"| RM["Read Model<br/>(读存储)"] QRY --> RM end
维度传统架构CQRS
数据模型读写共享读写分离
写模型3NF 规范化Event Store
读模型3NF 规范化反规范化视图
一致性强一致最终一致
扩展性读写同时扩展读写独立扩展
复杂度

2.2 Command 端实现#

// Command 定义
public record CreateOrderCommand(String orderId, String customerId, BigDecimal amount) {}
public record PayOrderCommand(String orderId, BigDecimal amount) {}
public record ShipOrderCommand(String orderId, String trackingNumber) {}
public record CancelOrderCommand(String orderId, String reason) {}
// Command Handler
@Service
public class OrderCommandHandler {
private final EventStore eventStore;
@Transactional
public void handle(CreateOrderCommand cmd) {
OrderAggregate order = new OrderAggregate();
order.create(cmd.orderId(), cmd.customerId(), cmd.amount());
eventStore.append(cmd.orderId(), order.getUncommittedEvents(), 0);
order.markEventsAsCommitted();
}
@Transactional
public void handle(PayOrderCommand cmd) {
List<DomainEvent> events = eventStore.load(cmd.orderId());
OrderAggregate order = OrderAggregate.fromEvents(events);
order.pay(cmd.amount());
eventStore.append(cmd.orderId(), order.getUncommittedEvents(), events.size());
order.markEventsAsCommitted();
}
}

2.3 Query 端实现#

// 事件投影:将事件转换为读模型
@Component
public class OrderProjection {
@KafkaListener(topics = "order-events")
public void handleEvent(ConsumerRecord<String, String> record) {
DomainEvent event = deserializeEvent(record.value());
switch (event) {
case OrderCreatedEvent e -> {
OrderReadModel readModel = new OrderReadModel();
readModel.setOrderId(e.orderId());
readModel.setCustomerId(e.customerId());
readModel.setAmount(e.amount());
readModel.setStatus("CREATED");
readModel.setCreatedAt(e.timestamp());
orderReadRepository.save(readModel);
}
case OrderPaidEvent e -> {
OrderReadModel readModel = orderReadRepository.findById(e.orderId()).orElseThrow();
readModel.setStatus("PAID");
readModel.setPaidAt(e.timestamp());
orderReadRepository.save(readModel);
}
case OrderShippedEvent e -> {
OrderReadModel readModel = orderReadRepository.findById(e.orderId()).orElseThrow();
readModel.setStatus("SHIPPED");
readModel.setTrackingNumber(e.trackingNumber());
orderReadRepository.save(readModel);
}
case OrderCancelledEvent e -> {
OrderReadModel readModel = orderReadRepository.findById(e.orderId()).orElseThrow();
readModel.setStatus("CANCELLED");
readModel.setCancelReason(e.reason());
orderReadRepository.save(readModel);
}
}
}
}
// Query Service:直接查询读模型
@Service
public class OrderQueryService {
public OrderReadModel getOrder(String orderId) {
return orderReadRepository.findById(orderId).orElseThrow();
}
public List<OrderReadModel> getOrdersByCustomer(String customerId) {
return orderReadRepository.findByCustomerId(customerId);
}
public Page<OrderReadModel> searchOrders(OrderSearchCriteria criteria) {
return orderReadRepository.search(criteria);
}
}

2.4 多种读模型#

CQRS 的强大之处在于可以为不同的查询需求创建不同的读模型:

读模型存储类型适用查询示例
关系型视图MySQL/PostgreSQL复杂条件查询订单列表、客户详情
搜索索引Elasticsearch全文搜索订单搜索
时序数据TimescaleDB趋势分析订单量趋势
图数据Neo4j关系查询客户关联分析
缓存Redis高频读取热点订单
graph TB ES["Event Store<br/>(Kafka)"] --> P1["Projection 1<br/>→ MySQL"] ES --> P2["Projection 2<br/>→ Elasticsearch"] ES --> P3["Projection 3<br/>→ Redis"] ES --> P4["Projection 4<br/>→ TimescaleDB"] Q1["查询: 订单列表"] --> P1 Q2["查询: 订单搜索"] --> P2 Q3["查询: 热点订单"] --> P3 Q4["查询: 订单趋势"] --> P4

三、快照机制#

3.1 为什么需要快照?#

当 Aggregate 的事件数量很大时,从所有事件重建状态会很慢:

事件数量重建时间问题
100< 1ms
10,000~10ms可接受
1,000,000~1s不可接受
100,000,000~100s严重问题

快照机制定期保存 Aggregate 的当前状态,重建时从最近的快照开始:

graph LR E1["Event 1"] --> E2["Event 2"] --> E3["..."] --> S["Snapshot<br/>(version=1000)"] S --> E4["Event 1001"] --> E5["Event 1002"] --> E6["..."] Note["重建:加载快照 + 重放 Event 1001+<br/>而非重放全部事件"]
// 快照实现
public class SnapshotStore {
private static final int SNAPSHOT_INTERVAL = 1000; // 每 1000 个事件一个快照
public void maybeSaveSnapshot(String aggregateId, List<DomainEvent> allEvents) {
if (allEvents.size() % SNAPSHOT_INTERVAL == 0) {
OrderAggregate aggregate = OrderAggregate.fromEvents(allEvents);
Snapshot snapshot = new Snapshot(
aggregateId,
allEvents.size(),
aggregate.getState(),
LocalDateTime.now()
);
snapshotRepository.save(snapshot);
}
}
public OrderAggregate loadWithSnapshot(String aggregateId) {
// 1. 加载最近的快照
Snapshot snapshot = snapshotRepository.findLatest(aggregateId);
// 2. 加载快照之后的事件
List<DomainEvent> events;
if (snapshot != null) {
events = eventStore.loadFromVersion(aggregateId, snapshot.getVersion());
OrderAggregate aggregate = OrderAggregate.fromSnapshot(snapshot);
events.forEach(aggregate::apply);
return aggregate;
} else {
events = eventStore.load(aggregateId);
return OrderAggregate.fromEvents(events);
}
}
}

3.2 快照策略#

策略触发条件优点缺点
定期快照每 N 个事件简单可能保存不必要的快照
基于大小事件总大小超阈值控制存储需要跟踪大小
基于时间每小时/每天可预测可能错过高频变更
按需快照加载时触发无额外写入首次加载仍慢
Warning

快照引入了新的复杂度:1)快照与事件的一致性——快照保存失败会导致状态不一致;2)快照的版本兼容——Aggregate 结构变更后旧快照可能无法使用;3)快照的存储成本——每个快照都是完整状态的副本。

四、Event Sourcing + CQRS 完整架构#

4.1 端到端流程#

sequenceDiagram participant C as Client participant CH as Command Handler participant ES as Event Store participant K as Kafka participant PJ as Projection participant RM as Read Model participant QS as Query Service C->>CH: CreateOrderCommand CH->>CH: 创建 Aggregate CH->>ES: 保存事件 ES->>K: 发布事件 K->>PJ: 消费事件 PJ->>RM: 更新读模型 C->>QS: 查询订单 QS->>RM: 读取 RM-->>QS: 返回结果 QS-->>C: 返回订单

4.2 最终一致性#

CQRS 的读模型是最终一致的——写入后立即读取可能看不到最新状态:

一致性级别延迟实现方式
强一致0读写同一存储(牺牲 CQRS 优势)
近实时< 100ms同步投影
最终一致秒级异步投影(推荐)
// 处理最终一致性的策略
@RestController
@RequestMapping("/api/orders")
public class OrderController {
@PostMapping
public ResponseEntity<OrderResponse> createOrder(@RequestBody CreateOrderRequest req) {
// 写入
commandHandler.handle(new CreateOrderCommand(req.getOrderId(), req.getCustomerId(), req.getAmount()));
// 策略一:返回 202 Accepted,客户端轮询
return ResponseEntity.accepted()
.body(new OrderResponse(req.getOrderId(), "PENDING", "Order is being processed"));
// 策略二:等待读模型更新(带超时)
// awaitReadModelUpdate(req.getOrderId(), Duration.ofSeconds(5));
}
}

五、Event Sourcing vs CRUD 对比#

维度CRUDEvent Sourcing
数据模型当前状态事件序列
更新方式覆盖追加
历史信息丢失完整保留
审计追踪额外实现天然支持
时间旅行不支持重放事件
复杂度
性能(写)高(追加)
性能(读)需要投影
一致性强一致最终一致
适用场景简单 CRUD复杂业务逻辑、审计需求
Tip

Event Sourcing 不是银弹——它带来了审计追踪和时间旅行的能力,但也引入了最终一致性、事件版本兼容和复杂度等挑战。选择 Event Sourcing 的判断标准:1)业务是否需要完整的变更历史?2)业务逻辑是否复杂到需要从事件重建状态?3)团队是否有能力管理最终一致性?如果答案都是”是”,Event Sourcing 值得考虑。

5.1 事件版本兼容#

当业务演进时,事件的结构可能需要变更,这带来了版本兼容问题:

策略说明优点缺点
向上兼容新版本能读旧数据简单限制变更范围
多版本处理每个版本一个处理器灵活代码膨胀
事件升级读取时转换旧版本透明需要升级函数
不可变事件永不修改旧事件,只发新类型最安全事件类型膨胀
// 事件升级器:将旧版本事件转换为新版本
public class EventUpgrader {
public DomainEvent upgrade(DomainEvent event) {
return switch (event) {
case OrderCreatedEventV1 v1 -> upgradeV1ToV2(v1);
case OrderCreatedEventV2 v2 -> v2; // 最新版本,无需升级
default -> event;
};
}
private OrderCreatedEventV2 upgradeV1ToV2(OrderCreatedEventV1 v1) {
// V1 没有 currency 字段,V2 新增了 currency
return new OrderCreatedEventV2(
v1.orderId(), v1.customerId(), v1.amount(),
"CNY", // 默认值
v1.timestamp()
);
}
}

5.2 事件存储查询实践#

-- 事件存储表的典型 Schema
CREATE TABLE event_store (
event_id UUID PRIMARY KEY,
aggregate_id UUID NOT NULL,
event_type VARCHAR(100) NOT NULL,
event_data JSONB NOT NULL,
version BIGINT NOT NULL,
timestamp TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (aggregate_id, version) -- 乐观锁:同一聚合版本唯一
);
-- 查询某个聚合的完整事件历史
SELECT event_type, event_data, version, timestamp
FROM event_store
WHERE aggregate_id = 'order-12345'
ORDER BY version ASC;
-- 查询某类事件(投影构建)
SELECT event_data->>'product_id' AS product_id,
COUNT(*) AS order_count,
SUM((event_data->>'amount')::numeric) AS total_amount
FROM event_store
WHERE event_type = 'OrderPlaced'
AND timestamp >= NOW() - INTERVAL '7 days'
GROUP BY event_data->>'product_id';

六、总结#

上一章建立了消息积压与反压机制的认知框架。

维度关键要点
Event Sourcing以事件序列替代状态存储,保留完整历史
CQRS读写分离,写模型用 Event Store,读模型用反规范化视图
快照定期保存 Aggregate 状态,加速重建
投影从事件派生多种读模型,适配不同查询需求
最终一致性CQRS 的代价,需要客户端感知和处理
适用场景复杂业务逻辑、审计需求、事件驱动系统

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

事件溯源与 CQRS
https://blog.souloss.com/posts/messaging/event-sourcing-and-cqrs/
作者
Souloss
发布于
2026-04-30
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时