mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1963 字
6 分钟
综合实战:构建事件驱动系统
2026-05-16

前面 15 章逐个拆解了消息语义、Kafka 架构、存储引擎、可靠性、流处理、四大系统对比、有序性、积压、事件溯源、Schema 演化、选型、一致性——现在是时候把它们组装成一个完整的系统了。要构建一个电商订单事件驱动系统,用 Kafka + Schema Registry + Kafka Streams + 事务性发件箱,把零散的知识点焊成一个可运行的架构。

一、系统设计#

1.1 业务场景#

要构建一个电商订单系统,核心需求:

需求说明对应章节
消息不丢不重订单事件必须可靠传递消息语义
Schema 管理消息格式需要版本化和兼容性Schema 演化
流处理实时计算订单统计[Kafka Streams](./06-Kafka Streams.md)
DB+MQ 一致性订单创建和消息发送必须一致消息与数据库
消息有序同一订单的事件必须有序消息有序性
消息积压应对大促时需要弹性伸缩消息积压

1.2 系统架构#

graph TB subgraph "订单服务" ORDER_API["Order API"] --> ORDER_SERVICE["Order Service"] ORDER_SERVICE --> DB["MySQL<br/>orders + outbox"] ORDER_SERVICE --> SR["Schema Registry"] end subgraph "消息基础设施" KAFKA["Kafka Cluster<br/>3 Brokers"] SR --> KAFKA KAFKA --> STREAMS["Kafka Streams<br/>Order Analytics"] end subgraph "下游服务" KAFKA --> PAYMENT["Payment Service"] KAFKA --> INVENTORY["Inventory Service"] KAFKA --> NOTIFY["Notification Service"] KAFKA --> ES["Elasticsearch<br/>(via CDC)"] end subgraph "监控" KAFKA --> PROM["Prometheus"] PROM --> GRAFANA["Grafana Dashboard"] end

1.3 Topic 设计#

Topic分区数副本数保留时间Schema
order-events1637 天Avro (OrderEvent)
order-stats8330 天Avro (OrderStats)
payment-events837 天Avro (PaymentEvent)
inventory-events837 天Avro (InventoryEvent)
order-alerts433 天Avro (OrderAlert)

二、Schema 定义与注册#

2.1 Schema 注册与兼容性#

Note

Schema 注册是系统上线前的第一步——先定义 Schema,再开发 Producer 和 Consumer。这确保了所有参与者对消息格式有统一的理解。

2.2 Schema 演化实战#

当需要给 OrderEvent 添加新字段时,必须保证向后兼容:

{
"type": "record",
"name": "OrderEvent",
"namespace": "com.example.order",
"fields": [
{"name": "eventId", "type": "string"},
{"name": "eventType", "type": {"type": "enum", "name": "OrderEventType", "symbols": ["CREATED", "PAID", "SHIPPED", "CANCELLED", "REFUNDED"]}},
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "CNY"},
{"name": "items", "type": {"type": "array", "items": "string"}, "default": []},
{"name": "timestamp", "type": "long"},
{"name": "version", "type": "int", "default": 1},
{"name": "shippingAddress", "type": ["null", "string"], "default": null},
{"name": "priority", "type": "int", "default": 0}
]
}

新增字段必须有默认值——旧 Consumer 读新消息时,新字段用默认值填充;新 Consumer 读旧消息时,新字段也用默认值填充。这就是向后兼容的核心。

三、事务性发件箱实现#

3.1 Order Service 实现#

@Service
public class OrderService {
private final OrderRepository orderRepository;
private final OutboxEventRepository outboxRepository;
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 1. 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString());
order.setCustomerId(request.getCustomerId());
order.setAmount(request.getAmount());
order.setStatus(OrderStatus.CREATED);
order.setItems(request.getItems());
orderRepository.save(order);
// 2. 写入 Outbox(同一事务)
OutboxEvent event = new OutboxEvent();
event.setId(UUID.randomUUID().toString());
event.setAggregateType("Order");
event.setAggregateId(order.getId());
event.setEventType("ORDER_CREATED");
event.setPayload(JSON.toJSONString(order));
outboxRepository.save(event);
return order;
}
@Transactional
public Order payOrder(String orderId, BigDecimal amount) {
Order order = orderRepository.findById(orderId).orElseThrow();
order.setStatus(OrderStatus.PAID);
orderRepository.save(order);
OutboxEvent event = new OutboxEvent();
event.setId(UUID.randomUUID().toString());
event.setAggregateType("Order");
event.setAggregateId(orderId);
event.setEventType("ORDER_PAID");
event.setPayload(JSON.toJSONString(Map.of("orderId", orderId, "amount", amount)));
outboxRepository.save(event);
return order;
}
}

3.2 Debezium Outbox Router#

{
"name": "order-outbox-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "debezium",
"database.server.id": "184054",
"database.server.name": "order_service",
"database.include.list": "order_db",
"table.include.list": "order_db.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.route.topic.replacement": "order-events",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregateid",
"transforms.outbox.table.field.event.type": "eventtype",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.payload.id": "aggregateid"
}
}

四、Kafka Streams 实时分析#

4.1 订单统计流#

StreamsBuilder builder = new StreamsBuilder();
// 1. 读取订单事件
KStream<String, GenericRecord> orderEvents = builder.stream("order-events");
// 2. 按客户分组,5分钟翻滚窗口统计
KTable<Windowed<String>, OrderStats> windowedStats = orderEvents
.groupBy((key, event) -> event.get("customerId").toString())
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.aggregate(
OrderStats::new,
(customerId, event, stats) -> {
stats.setOrderCount(stats.getOrderCount() + 1);
stats.setTotalAmount(stats.getTotalAmount() + (double) event.get("amount"));
return stats;
},
Materialized.with(Serdes.String(), new JsonSerde<>(OrderStats.class))
);
// 3. 输出统计结果
windowedStats.toStream()
.map((windowedKey, stats) -> new KeyValue<>(windowedKey.key(), stats))
.to("order-stats", Produced.with(Serdes.String(), new JsonSerde<>(OrderStats.class)));
// 4. 异常检测:5分钟内消费超过阈值
KStream<String, OrderAlert> alerts = windowedStats
.toStream()
.filter((windowedKey, stats) -> stats.getTotalAmount() > 50000)
.map((windowedKey, stats) -> new KeyValue<>(
windowedKey.key(),
new OrderAlert("HIGH_SPENDING", windowedKey.key(), stats.getTotalAmount())
));
alerts.to("order-alerts", Produced.with(Serdes.String(), new JsonSerde<>(OrderAlert.class)));

4.2 流处理拓扑#

graph LR OE["order-events<br/>Topic"] --> GROUP["GroupBy<br/>customerId"] GROUP --> WIN["Windowed<br/>5min Tumbling"] WIN --> AGG["Aggregate<br/>OrderStats"] AGG --> STATS["order-stats<br/>Topic"] AGG --> FILTER["Filter<br/>total > 50000"] FILTER --> ALERTS["order-alerts<br/>Topic"]

4.3 Kafka Streams 状态存储#

Kafka Streams 的聚合操作依赖本地状态存储(RocksDB),需要合理配置:

配置项说明推荐值
state.dir本地状态存储目录/data/kafka-streams/state
num.stream.threads流处理线程数Broker 数量
cache.max.bytes.buffering记录缓存大小10MB
commit.interval.ms提交间隔1000ms
rocksdb.config.setterRocksDB 调优按需调整

五、事件溯源实现#

5.1 事件溯源架构#

事件溯源(Event Sourcing)把事件作为数据的唯一来源——不存储当前状态,而是存储所有事件,通过回放事件重建状态:

graph TB CMD["Command<br/>创建订单"] --> AGG["Order Aggregate<br/>(从事件重建状态)"] AGG -->|"验证 + 产生事件"| EVENTS["Event Store<br/>order-events Topic"] EVENTS -->|"回放事件"| AGG EVENTS -->|"投影"| READ["Read Model<br/>MySQL/ES/Redis"] style EVENTS fill:#fff9c4,stroke:#f9a825

5.2 聚合根实现#

// 订单聚合根:事件溯源
public class OrderAggregate {
private String orderId;
private String customerId;
private double amount;
private OrderStatus status;
private List<String> items;
private List<Object> pendingEvents = new ArrayList<>();
private long version = 0;
// 从事件流重建状态
public static OrderAggregate fromEvents(List<OrderEvent> events) {
OrderAggregate aggregate = new OrderAggregate();
for (OrderEvent event : events) {
aggregate.apply(event);
aggregate.version++;
}
return aggregate;
}
// Command:创建订单
public void createOrder(String orderId, String customerId, double amount, List<String> items) {
if (status != null) throw new IllegalStateException("Order already exists");
OrderCreatedEvent event = new OrderCreatedEvent(orderId, customerId, amount, items);
pendingEvents.add(event);
apply(event);
}
// Command:支付订单
public void payOrder(double paidAmount) {
if (status != OrderStatus.CREATED) throw new IllegalStateException("Invalid status for payment");
if (paidAmount != amount) throw new IllegalArgumentException("Payment amount mismatch");
OrderPaidEvent event = new OrderPaidEvent(orderId, paidAmount);
pendingEvents.add(event);
apply(event);
}
// Command:取消订单
public void cancelOrder(String reason) {
if (status == OrderStatus.SHIPPED) throw new IllegalStateException("Cannot cancel shipped order");
OrderCancelledEvent event = new OrderCancelledEvent(orderId, reason);
pendingEvents.add(event);
apply(event);
}
// 事件应用(状态转移)
private void apply(OrderEvent event) {
switch (event) {
case OrderCreatedEvent e -> {
this.orderId = e.orderId();
this.customerId = e.customerId();
this.amount = e.amount();
this.items = e.items();
this.status = OrderStatus.CREATED;
}
case OrderPaidEvent e -> this.status = OrderStatus.PAID;
case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED;
case OrderShippedEvent e -> this.status = OrderStatus.SHIPPED;
}
}
public List<Object> getPendingEvents() { return pendingEvents; }
public void clearPendingEvents() { pendingEvents.clear(); }
}

5.3 事件存储服务#

@Service
public class EventSourcedOrderService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final OrderEventRepository eventRepository;
@Transactional
public Order handleCommand(CreateOrderCommand cmd) {
// 1. 从事件存储重建聚合根
List<OrderEvent> history = eventRepository.findByOrderId(cmd.getOrderId());
OrderAggregate aggregate = OrderAggregate.fromEvents(history);
// 2. 执行命令(产生新事件)
aggregate.createOrder(cmd.getOrderId(), cmd.getCustomerId(),
cmd.getAmount(), cmd.getItems());
// 3. 持久化新事件到 Outbox(保证一致性)
for (Object event : aggregate.getPendingEvents()) {
OutboxEvent outbox = new OutboxEvent();
outbox.setId(UUID.randomUUID().toString());
outbox.setAggregateType("Order");
outbox.setAggregateId(cmd.getOrderId());
outbox.setEventType(event.getClass().getSimpleName());
outbox.setPayload(JSON.toJSONString(event));
outboxRepository.save(outbox);
}
aggregate.clearPendingEvents();
return aggregate.toOrder();
}
}

六、CQRS 读模型#

6.1 CQRS 架构#

CQRS(Command Query Responsibility Segregation)把写模型和读模型分离——写模型使用事件溯源,读模型使用投影(Projection)从事件流构建优化的查询视图:

graph TB subgraph "写模型(Command Side)" CMD["Command API"] --> AGG["Order Aggregate<br/>(事件溯源)"] AGG --> OUTBOX["Outbox Table"] end OUTBOX -->|"Debezium CDC"| KAFKA["Kafka<br/>order-events"] subgraph "读模型(Query Side)" KAFKA --> PROJ_MYSQL["MySQL 投影<br/>订单查询表"] KAFKA --> PROJ_ES["ES 投影<br/>订单搜索"] KAFKA --> PROJ_REDIS["Redis 投影<br/>订单缓存"] end QUERY["Query API"] --> PROJ_MYSQL QUERY --> PROJ_ES QUERY --> PROJ_REDIS

6.2 投影实现#

// MySQL 投影:构建订单查询视图
@Service
public class OrderProjection {
private final JdbcTemplate jdbcTemplate;
@KafkaListener(topics = "order-events", groupId = "order-projection-mysql")
public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) {
GenericRecord event = record.value();
String eventType = event.get("eventType").toString();
String orderId = event.get("orderId").toString();
switch (eventType) {
case "ORDER_CREATED" -> jdbcTemplate.update(
"INSERT INTO order_view (id, customer_id, amount, status, items, created_at, updated_at) " +
"VALUES (?, ?, ?, 'CREATED', ?, NOW(), NOW()) " +
"ON DUPLICATE KEY UPDATE status='CREATED', updated_at=NOW()",
orderId, event.get("customerId"), event.get("amount"), event.get("items").toString()
);
case "ORDER_PAID" -> jdbcTemplate.update(
"UPDATE order_view SET status='PAID', updated_at=NOW() WHERE id=?",
orderId
);
case "ORDER_SHIPPED" -> jdbcTemplate.update(
"UPDATE order_view SET status='SHIPPED', updated_at=NOW() WHERE id=?",
orderId
);
case "ORDER_CANCELLED" -> jdbcTemplate.update(
"UPDATE order_view SET status='CANCELLED', updated_at=NOW() WHERE id=?",
orderId
);
}
}
}
// Elasticsearch 投影:构建搜索视图
@Service
public class OrderSearchProjection {
private final RestHighLevelClient esClient;
@KafkaListener(topics = "order-events", groupId = "order-projection-es")
public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) {
GenericRecord event = record.value();
String orderId = event.get("orderId").toString();
// 幂等更新:使用事件 ID 作为版本号
UpdateRequest request = new UpdateRequest("orders", orderId);
request.doc(JSON.toJSONString(event), XContentType.JSON);
request.docAsUpsert(true); // 不存在则插入
request.retryOnConflict(3); // 版本冲突重试
esClient.update(request, RequestOptions.DEFAULT);
}
}

6.3 读模型对比#

读模型数据存储适用查询延迟一致性
MySQL 投影MySQL按 ID 查询、列表分页毫秒级最终一致
ES 投影Elasticsearch全文搜索、聚合分析毫秒级最终一致
Redis 投影Redis热点数据缓存微秒级最终一致
Kafka Streams KTableRocksDB实时聚合毫秒级强一致(流内)
Warning

CQRS 的读模型是最终一致的——事件产生到投影更新之间有延迟。如果业务要求强一致读取,应该直接查写模型的数据库,而不是读模型。大多数场景下,最终一致的读模型已经够用。

七、下游服务消费#

7.1 Payment Service#

@Service
public class PaymentConsumer {
@KafkaListener(topics = "order-events", groupId = "payment-service")
public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) {
GenericRecord event = record.value();
String eventType = event.get("eventType").toString();
String orderId = event.get("orderId").toString();
switch (eventType) {
case "ORDER_CREATED" -> {
// 创建待支付记录
Payment payment = new Payment();
payment.setOrderId(orderId);
payment.setAmount((double) event.get("amount"));
payment.setStatus(PaymentStatus.PENDING);
paymentRepository.save(payment);
}
case "ORDER_CANCELLED" -> {
// 取消支付
paymentRepository.cancelByOrderId(orderId);
}
}
}
}

7.2 Inventory Service#

@Service
public class InventoryConsumer {
@KafkaListener(topics = "order-events", groupId = "inventory-service")
public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) {
GenericRecord event = record.value();
String eventType = event.get("eventType").toString();
String orderId = event.get("orderId").toString();
switch (eventType) {
case "ORDER_CREATED" -> {
// 锁定库存
List<String> items = (List<String>) event.get("items");
inventoryService.lockStock(items, orderId);
}
case "ORDER_PAID" -> {
// 确认扣减库存
inventoryService.confirmDeduct(orderId);
}
case "ORDER_CANCELLED" -> {
// 释放库存
inventoryService.releaseStock(orderId);
}
}
}
}

八、Docker Compose 完整部署#

8.1 完整技术栈#

version: "3.8"
services:
mysql:
image: mysql:8.0
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: order_db
MYSQL_USER: debezium
MYSQL_PASSWORD: debezium
command: >
--server-id=1
--log-bin=mysql-bin
--binlog-format=ROW
--binlog-row-image=FULL
--expire-logs-days=7
volumes:
- mysql-data:/var/lib/mysql
ports:
- "3306:3306"
zookeeper:
image: confluentinc/cp-zookeeper:7.6.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.6.0
depends_on: [zookeeper]
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"
KAFKA_LOG_RETENTION_HOURS: 168
KAFKA_NUM_PARTITIONS: 16
volumes:
- kafka-data:/var/lib/kafka/data
ports:
- "9092:9092"
schema-registry:
image: confluentinc/cp-schema-registry:7.6.0
depends_on: [kafka]
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092
SCHEMA_REGISTRY_HOST_NAME: schema-registry
ports:
- "8081:8081"
kafka-connect:
image: confluentinc/cp-kafka-connect:7.6.0
depends_on: [kafka, mysql]
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: order-connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-statuses
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars
volumes:
- ./debezium-plugins:/etc/kafka-connect/jars
ports:
- "8083:8083"
elasticsearch:
image: elasticsearch:8.12.0
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
volumes:
- es-data:/usr/share/elasticsearch/data
ports:
- "9200:9200"
prometheus:
image: prom/prometheus:v2.50.0
volumes:
- ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml
ports:
- "9090:9090"
grafana:
image: grafana/grafana:10.3.0
depends_on: [prometheus]
volumes:
- ./monitoring/grafana-dashboards:/etc/grafana/provisioning/dashboards
ports:
- "3000:3000"
volumes:
mysql-data:
kafka-data:
es-data:

九、监控与运维#

9.1 关键监控指标#

指标来源告警条件
Consumer LagKafka> 10万条
Schema 版本Schema Registry不兼容变更
Outbox 堆积MySQL待发送消息 > 1000
Debezium 延迟Kafka Connect> 5s
Streams 处理速率Kafka Streams突然下降
订单创建速率Order Service突增 3x

9.2 告警规则#

groups:
- name: order-system
rules:
- alert: ConsumerLagHigh
expr: kafka_consumer_consumer_lag_records_lag_max > 100000
for: 5m
labels:
severity: warning
annotations:
summary: "Consumer lag 超过 10 万条"
- alert: DebeziumLagHigh
expr: debezium_connector_source_lag_seconds > 5
for: 2m
labels:
severity: critical
annotations:
summary: "Debezium 延迟超过 5 秒"
- alert: SchemaIncompatible
expr: schema_registry_compatibility_check_failed > 0
for: 1m
labels:
severity: critical
annotations:
summary: "Schema 不兼容变更"

十、测试策略#

10.1 测试金字塔#

graph TB E2E["端到端测试<br/>Docker Compose 全链路<br/>少量、慢"] INT["集成测试<br/>Embedded Kafka + Testcontainers<br/>中等数量"] UNIT["单元测试<br/>聚合根 + 投影逻辑<br/>大量、快"] UNIT --> INT --> E2E style UNIT fill:#c8e6c9,stroke:#2e7d32 style INT fill:#fff9c4,stroke:#f9a825 style E2E fill:#ffcdd2,stroke:#c62828

10.2 单元测试:聚合根#

class OrderAggregateTest {
@Test
void shouldCreateOrder() {
OrderAggregate aggregate = new OrderAggregate();
aggregate.createOrder("ORD-1", "CUST-1", 100.0, List.of("ITEM-1"));
assertEquals(OrderStatus.CREATED, aggregate.getStatus());
assertEquals(1, aggregate.getPendingEvents().size());
assertTrue(aggregate.getPendingEvents().get(0) instanceof OrderCreatedEvent);
}
@Test
void shouldRebuildFromEvents() {
List<OrderEvent> events = List.of(
new OrderCreatedEvent("ORD-1", "CUST-1", 100.0, List.of("ITEM-1")),
new OrderPaidEvent("ORD-1", 100.0)
);
OrderAggregate aggregate = OrderAggregate.fromEvents(events);
assertEquals(OrderStatus.PAID, aggregate.getStatus());
assertEquals(0, aggregate.getPendingEvents().size()); // 重建不产生新事件
}
@Test
void shouldRejectDuplicatePayment() {
OrderAggregate aggregate = new OrderAggregate();
aggregate.createOrder("ORD-1", "CUST-1", 100.0, List.of("ITEM-1"));
aggregate.payOrder(100.0);
assertThrows(IllegalStateException.class, () -> aggregate.payOrder(100.0));
}
}

10.3 集成测试:Embedded Kafka#

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"order-events"})
class OrderProjectionTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Test
void shouldProjectOrderCreatedEvent() throws Exception {
// 发送 ORDER_CREATED 事件
String event = JSON.toJSONString(Map.of(
"eventType", "ORDER_CREATED",
"orderId", "ORD-1",
"customerId", "CUST-1",
"amount", 100.0
));
kafkaTemplate.send("order-events", "ORD-1", event).get();
// 等待投影更新
Thread.sleep(2000);
// 验证读模型
OrderView view = orderViewRepository.findById("ORD-1").orElseThrow();
assertEquals("CUST-1", view.getCustomerId());
assertEquals(OrderStatus.CREATED, view.getStatus());
}
}

10.4 端到端测试:Testcontainers#

@Testcontainers
class OrderSystemE2ETest {
@Container
static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.6.0")
);
@Container
static final MySQLContainer<?> mysql = new MySQLContainer<>(
DockerImageName.parse("mysql:8.0")
).withCommand("--binlog-format=ROW");
@Test
void shouldCompleteOrderFlow() {
// 1. 创建订单 → 验证事件发送
Order order = orderService.createOrder(request);
await().atMost(5, SECONDS).until(() ->
kafkaConsumer.records("order-events").size() >= 1
);
// 2. 支付订单 → 验证库存扣减
orderService.payOrder(order.getId(), order.getAmount());
await().atMost(5, SECONDS).until(() ->
inventoryService.getLockedQuantity(order.getId()) > 0
);
// 3. 验证通知发送
await().atMost(5, SECONDS).until(() ->
notificationService.getSentCount(order.getId()) >= 1
);
}
}

10.5 测试策略对比#

测试类型范围速度数量关键工具
单元测试聚合根逻辑毫秒级大量JUnit 5
集成测试投影 + 消费者秒级中等Embedded Kafka
契约测试Schema 兼容性毫秒级每次变更Schema Registry API
端到端测试全链路分钟级少量Testcontainers
故障注入容错恢复分钟级关键场景Chaos Monkey
Tip

事件驱动系统的测试关键是”事件驱动”——不要测 HTTP 接口,测事件流。单元测试验证聚合根的事件产生逻辑,集成测试验证投影的正确性,端到端测试验证全链路事件传递。Schema 兼容性测试应该在 CI 中自动执行。

十一、故障演练#

11.1 故障场景与恢复#

故障场景影响恢复策略
Kafka Broker 宕机消息发送暂停ISR 自动切换 Leader
Schema 不兼容消息发送失败回滚 Schema 或创建新版本
Debezium 停止Outbox 堆积重启 Connector,自动恢复
DB 主从切换短暂不可用应用层重试
消费者 OOM消费暂停重启消费者,从上次 Offset 继续
投影延迟读模型不一致监控 lag,必要时重建投影

11.2 投影重建#

当读模型与写模型不一致时(如投影 Bug 导致数据错误),需要重建投影:

Warning

定期进行故障演练——在非生产环境中模拟各种故障场景,验证系统的恢复能力。特别是:1)Kafka Broker 故障时的 Leader 切换;2)Schema 不兼容时的回滚流程;3)Debezium 故障时的 Outbox 堆积处理;4)投影 Bug 时的重建流程。

十二、总结#

上一章理解了消息与数据库一致性。

维度关键要点
系统架构Kafka + Schema Registry + Debezium + Kafka Streams
Schema 管理Avro Schema + Schema Registry,向后兼容策略
一致性保证事务性发件箱 + Debezium CDC,替代双写
事件溯源聚合根 + 事件存储,事件作为数据唯一来源
CQRS写模型(事件溯源)+ 读模型(投影),最终一致
流处理Kafka Streams 窗口聚合 + 异常检测
有序性orderId 作为 Key,保证同一订单事件有序
监控Consumer Lag + Debezium 延迟 + Schema 版本
测试单元(聚合根)+ 集成(投影)+ E2E(全链路)
故障演练定期模拟故障,验证恢复能力
Tip

构建事件驱动系统的核心不是选择哪个消息队列,而是设计好三个关键环节:1)消息与数据库的一致性(发件箱模式);2)消息格式的版本管理(Schema Registry);3)消息的流式处理(Kafka Streams)。这三个环节做好了,任何消息队列都能支撑你的系统。

本系列到此结束。从 消息系统全景 开始,走过了消息语义、Kafka 架构与存储、RabbitMQ 智能路由、RocketMQ 事务消息、Pulsar 分层架构、消息有序性、积压与反压、事件溯源与 CQRS、Schema 演化、消息选型、数据库一致性,最终构建了完整的事件驱动系统。希望这个系列能帮助你深入理解消息系统的设计精髓。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

综合实战:构建事件驱动系统
https://blog.souloss.com/posts/messaging/messaging-hands-on-practice/
作者
Souloss
发布于
2026-05-16
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时