前面 15 章逐个拆解了消息语义、Kafka 架构、存储引擎、可靠性、流处理、四大系统对比、有序性、积压、事件溯源、Schema 演化、选型、一致性——现在是时候把它们组装成一个完整的系统了。要构建一个电商订单事件驱动系统,用 Kafka + Schema Registry + Kafka Streams + 事务性发件箱,把零散的知识点焊成一个可运行的架构。
一、系统设计
1.1 业务场景
要构建一个电商订单系统,核心需求:
1.2 系统架构
1.3 Topic 设计
| Topic | 分区数 | 副本数 | 保留时间 | Schema |
|---|---|---|---|---|
| order-events | 16 | 3 | 7 天 | Avro (OrderEvent) |
| order-stats | 8 | 3 | 30 天 | Avro (OrderStats) |
| payment-events | 8 | 3 | 7 天 | Avro (PaymentEvent) |
| inventory-events | 8 | 3 | 7 天 | Avro (InventoryEvent) |
| order-alerts | 4 | 3 | 3 天 | Avro (OrderAlert) |
二、Schema 定义与注册
2.1 Schema 注册与兼容性
Schema 注册是系统上线前的第一步——先定义 Schema,再开发 Producer 和 Consumer。这确保了所有参与者对消息格式有统一的理解。
2.2 Schema 演化实战
当需要给 OrderEvent 添加新字段时,必须保证向后兼容:
{ "type": "record", "name": "OrderEvent", "namespace": "com.example.order", "fields": [ {"name": "eventId", "type": "string"}, {"name": "eventType", "type": {"type": "enum", "name": "OrderEventType", "symbols": ["CREATED", "PAID", "SHIPPED", "CANCELLED", "REFUNDED"]}}, {"name": "orderId", "type": "string"}, {"name": "customerId", "type": "string"}, {"name": "amount", "type": "double"}, {"name": "currency", "type": "string", "default": "CNY"}, {"name": "items", "type": {"type": "array", "items": "string"}, "default": []}, {"name": "timestamp", "type": "long"}, {"name": "version", "type": "int", "default": 1}, {"name": "shippingAddress", "type": ["null", "string"], "default": null}, {"name": "priority", "type": "int", "default": 0} ]}新增字段必须有默认值——旧 Consumer 读新消息时,新字段用默认值填充;新 Consumer 读旧消息时,新字段也用默认值填充。这就是向后兼容的核心。
三、事务性发件箱实现
3.1 Order Service 实现
@Servicepublic class OrderService { private final OrderRepository orderRepository; private final OutboxEventRepository outboxRepository; @Transactional public Order createOrder(CreateOrderRequest request) { // 1. 创建订单 Order order = new Order(); order.setId(UUID.randomUUID().toString()); order.setCustomerId(request.getCustomerId()); order.setAmount(request.getAmount()); order.setStatus(OrderStatus.CREATED); order.setItems(request.getItems()); orderRepository.save(order); // 2. 写入 Outbox(同一事务) OutboxEvent event = new OutboxEvent(); event.setId(UUID.randomUUID().toString()); event.setAggregateType("Order"); event.setAggregateId(order.getId()); event.setEventType("ORDER_CREATED"); event.setPayload(JSON.toJSONString(order)); outboxRepository.save(event); return order; } @Transactional public Order payOrder(String orderId, BigDecimal amount) { Order order = orderRepository.findById(orderId).orElseThrow(); order.setStatus(OrderStatus.PAID); orderRepository.save(order); OutboxEvent event = new OutboxEvent(); event.setId(UUID.randomUUID().toString()); event.setAggregateType("Order"); event.setAggregateId(orderId); event.setEventType("ORDER_PAID"); event.setPayload(JSON.toJSONString(Map.of("orderId", orderId, "amount", amount))); outboxRepository.save(event); return order; }}3.2 Debezium Outbox Router
{ "name": "order-outbox-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "debezium", "database.server.id": "184054", "database.server.name": "order_service", "database.include.list": "order_db", "table.include.list": "order_db.outbox_events", "transforms": "outbox", "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter", "transforms.outbox.route.topic.replacement": "order-events", "transforms.outbox.table.field.event.id": "id", "transforms.outbox.table.field.event.key": "aggregateid", "transforms.outbox.table.field.event.type": "eventtype", "transforms.outbox.table.field.event.payload": "payload", "transforms.outbox.table.field.event.payload.id": "aggregateid" }}四、Kafka Streams 实时分析
4.1 订单统计流
StreamsBuilder builder = new StreamsBuilder();// 1. 读取订单事件KStream<String, GenericRecord> orderEvents = builder.stream("order-events");// 2. 按客户分组,5分钟翻滚窗口统计KTable<Windowed<String>, OrderStats> windowedStats = orderEvents .groupBy((key, event) -> event.get("customerId").toString()) .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))) .aggregate( OrderStats::new, (customerId, event, stats) -> { stats.setOrderCount(stats.getOrderCount() + 1); stats.setTotalAmount(stats.getTotalAmount() + (double) event.get("amount")); return stats; }, Materialized.with(Serdes.String(), new JsonSerde<>(OrderStats.class)) );// 3. 输出统计结果windowedStats.toStream() .map((windowedKey, stats) -> new KeyValue<>(windowedKey.key(), stats)) .to("order-stats", Produced.with(Serdes.String(), new JsonSerde<>(OrderStats.class)));// 4. 异常检测:5分钟内消费超过阈值KStream<String, OrderAlert> alerts = windowedStats .toStream() .filter((windowedKey, stats) -> stats.getTotalAmount() > 50000) .map((windowedKey, stats) -> new KeyValue<>( windowedKey.key(), new OrderAlert("HIGH_SPENDING", windowedKey.key(), stats.getTotalAmount()) ));alerts.to("order-alerts", Produced.with(Serdes.String(), new JsonSerde<>(OrderAlert.class)));4.2 流处理拓扑
4.3 Kafka Streams 状态存储
Kafka Streams 的聚合操作依赖本地状态存储(RocksDB),需要合理配置:
| 配置项 | 说明 | 推荐值 |
|---|---|---|
state.dir | 本地状态存储目录 | /data/kafka-streams/state |
num.stream.threads | 流处理线程数 | Broker 数量 |
cache.max.bytes.buffering | 记录缓存大小 | 10MB |
commit.interval.ms | 提交间隔 | 1000ms |
rocksdb.config.setter | RocksDB 调优 | 按需调整 |
五、事件溯源实现
5.1 事件溯源架构
事件溯源(Event Sourcing)把事件作为数据的唯一来源——不存储当前状态,而是存储所有事件,通过回放事件重建状态:
5.2 聚合根实现
// 订单聚合根:事件溯源public class OrderAggregate { private String orderId; private String customerId; private double amount; private OrderStatus status; private List<String> items; private List<Object> pendingEvents = new ArrayList<>(); private long version = 0; // 从事件流重建状态 public static OrderAggregate fromEvents(List<OrderEvent> events) { OrderAggregate aggregate = new OrderAggregate(); for (OrderEvent event : events) { aggregate.apply(event); aggregate.version++; } return aggregate; } // Command:创建订单 public void createOrder(String orderId, String customerId, double amount, List<String> items) { if (status != null) throw new IllegalStateException("Order already exists"); OrderCreatedEvent event = new OrderCreatedEvent(orderId, customerId, amount, items); pendingEvents.add(event); apply(event); } // Command:支付订单 public void payOrder(double paidAmount) { if (status != OrderStatus.CREATED) throw new IllegalStateException("Invalid status for payment"); if (paidAmount != amount) throw new IllegalArgumentException("Payment amount mismatch"); OrderPaidEvent event = new OrderPaidEvent(orderId, paidAmount); pendingEvents.add(event); apply(event); } // Command:取消订单 public void cancelOrder(String reason) { if (status == OrderStatus.SHIPPED) throw new IllegalStateException("Cannot cancel shipped order"); OrderCancelledEvent event = new OrderCancelledEvent(orderId, reason); pendingEvents.add(event); apply(event); } // 事件应用(状态转移) private void apply(OrderEvent event) { switch (event) { case OrderCreatedEvent e -> { this.orderId = e.orderId(); this.customerId = e.customerId(); this.amount = e.amount(); this.items = e.items(); this.status = OrderStatus.CREATED; } case OrderPaidEvent e -> this.status = OrderStatus.PAID; case OrderCancelledEvent e -> this.status = OrderStatus.CANCELLED; case OrderShippedEvent e -> this.status = OrderStatus.SHIPPED; } } public List<Object> getPendingEvents() { return pendingEvents; } public void clearPendingEvents() { pendingEvents.clear(); }}5.3 事件存储服务
@Servicepublic class EventSourcedOrderService { private final KafkaTemplate<String, String> kafkaTemplate; private final OrderEventRepository eventRepository; @Transactional public Order handleCommand(CreateOrderCommand cmd) { // 1. 从事件存储重建聚合根 List<OrderEvent> history = eventRepository.findByOrderId(cmd.getOrderId()); OrderAggregate aggregate = OrderAggregate.fromEvents(history); // 2. 执行命令(产生新事件) aggregate.createOrder(cmd.getOrderId(), cmd.getCustomerId(), cmd.getAmount(), cmd.getItems()); // 3. 持久化新事件到 Outbox(保证一致性) for (Object event : aggregate.getPendingEvents()) { OutboxEvent outbox = new OutboxEvent(); outbox.setId(UUID.randomUUID().toString()); outbox.setAggregateType("Order"); outbox.setAggregateId(cmd.getOrderId()); outbox.setEventType(event.getClass().getSimpleName()); outbox.setPayload(JSON.toJSONString(event)); outboxRepository.save(outbox); } aggregate.clearPendingEvents(); return aggregate.toOrder(); }}六、CQRS 读模型
6.1 CQRS 架构
CQRS(Command Query Responsibility Segregation)把写模型和读模型分离——写模型使用事件溯源,读模型使用投影(Projection)从事件流构建优化的查询视图:
6.2 投影实现
// MySQL 投影:构建订单查询视图@Servicepublic class OrderProjection { private final JdbcTemplate jdbcTemplate; @KafkaListener(topics = "order-events", groupId = "order-projection-mysql") public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) { GenericRecord event = record.value(); String eventType = event.get("eventType").toString(); String orderId = event.get("orderId").toString(); switch (eventType) { case "ORDER_CREATED" -> jdbcTemplate.update( "INSERT INTO order_view (id, customer_id, amount, status, items, created_at, updated_at) " + "VALUES (?, ?, ?, 'CREATED', ?, NOW(), NOW()) " + "ON DUPLICATE KEY UPDATE status='CREATED', updated_at=NOW()", orderId, event.get("customerId"), event.get("amount"), event.get("items").toString() ); case "ORDER_PAID" -> jdbcTemplate.update( "UPDATE order_view SET status='PAID', updated_at=NOW() WHERE id=?", orderId ); case "ORDER_SHIPPED" -> jdbcTemplate.update( "UPDATE order_view SET status='SHIPPED', updated_at=NOW() WHERE id=?", orderId ); case "ORDER_CANCELLED" -> jdbcTemplate.update( "UPDATE order_view SET status='CANCELLED', updated_at=NOW() WHERE id=?", orderId ); } }}// Elasticsearch 投影:构建搜索视图@Servicepublic class OrderSearchProjection { private final RestHighLevelClient esClient; @KafkaListener(topics = "order-events", groupId = "order-projection-es") public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) { GenericRecord event = record.value(); String orderId = event.get("orderId").toString(); // 幂等更新:使用事件 ID 作为版本号 UpdateRequest request = new UpdateRequest("orders", orderId); request.doc(JSON.toJSONString(event), XContentType.JSON); request.docAsUpsert(true); // 不存在则插入 request.retryOnConflict(3); // 版本冲突重试 esClient.update(request, RequestOptions.DEFAULT); }}6.3 读模型对比
| 读模型 | 数据存储 | 适用查询 | 延迟 | 一致性 |
|---|---|---|---|---|
| MySQL 投影 | MySQL | 按 ID 查询、列表分页 | 毫秒级 | 最终一致 |
| ES 投影 | Elasticsearch | 全文搜索、聚合分析 | 毫秒级 | 最终一致 |
| Redis 投影 | Redis | 热点数据缓存 | 微秒级 | 最终一致 |
| Kafka Streams KTable | RocksDB | 实时聚合 | 毫秒级 | 强一致(流内) |
CQRS 的读模型是最终一致的——事件产生到投影更新之间有延迟。如果业务要求强一致读取,应该直接查写模型的数据库,而不是读模型。大多数场景下,最终一致的读模型已经够用。
七、下游服务消费
7.1 Payment Service
@Servicepublic class PaymentConsumer { @KafkaListener(topics = "order-events", groupId = "payment-service") public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) { GenericRecord event = record.value(); String eventType = event.get("eventType").toString(); String orderId = event.get("orderId").toString(); switch (eventType) { case "ORDER_CREATED" -> { // 创建待支付记录 Payment payment = new Payment(); payment.setOrderId(orderId); payment.setAmount((double) event.get("amount")); payment.setStatus(PaymentStatus.PENDING); paymentRepository.save(payment); } case "ORDER_CANCELLED" -> { // 取消支付 paymentRepository.cancelByOrderId(orderId); } } }}7.2 Inventory Service
@Servicepublic class InventoryConsumer { @KafkaListener(topics = "order-events", groupId = "inventory-service") public void handleOrderEvent(ConsumerRecord<String, GenericRecord> record) { GenericRecord event = record.value(); String eventType = event.get("eventType").toString(); String orderId = event.get("orderId").toString(); switch (eventType) { case "ORDER_CREATED" -> { // 锁定库存 List<String> items = (List<String>) event.get("items"); inventoryService.lockStock(items, orderId); } case "ORDER_PAID" -> { // 确认扣减库存 inventoryService.confirmDeduct(orderId); } case "ORDER_CANCELLED" -> { // 释放库存 inventoryService.releaseStock(orderId); } } }}八、Docker Compose 完整部署
8.1 完整技术栈
version: "3.8"services: mysql: image: mysql:8.0 environment: MYSQL_ROOT_PASSWORD: root MYSQL_DATABASE: order_db MYSQL_USER: debezium MYSQL_PASSWORD: debezium command: > --server-id=1 --log-bin=mysql-bin --binlog-format=ROW --binlog-row-image=FULL --expire-logs-days=7 volumes: - mysql-data:/var/lib/mysql ports: - "3306:3306" zookeeper: image: confluentinc/cp-zookeeper:7.6.0 environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:7.6.0 depends_on: [zookeeper] environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" KAFKA_LOG_RETENTION_HOURS: 168 KAFKA_NUM_PARTITIONS: 16 volumes: - kafka-data:/var/lib/kafka/data ports: - "9092:9092" schema-registry: image: confluentinc/cp-schema-registry:7.6.0 depends_on: [kafka] environment: SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:9092 SCHEMA_REGISTRY_HOST_NAME: schema-registry ports: - "8081:8081" kafka-connect: image: confluentinc/cp-kafka-connect:7.6.0 depends_on: [kafka, mysql] environment: CONNECT_BOOTSTRAP_SERVERS: kafka:9092 CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: order-connect-cluster CONNECT_CONFIG_STORAGE_TOPIC: connect-configs CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets CONNECT_STATUS_STORAGE_TOPIC: connect-statuses CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081 CONNECT_PLUGIN_PATH: /usr/share/java,/etc/kafka-connect/jars volumes: - ./debezium-plugins:/etc/kafka-connect/jars ports: - "8083:8083" elasticsearch: image: elasticsearch:8.12.0 environment: - discovery.type=single-node - xpack.security.enabled=false - "ES_JAVA_OPTS=-Xms512m -Xmx512m" volumes: - es-data:/usr/share/elasticsearch/data ports: - "9200:9200" prometheus: image: prom/prometheus:v2.50.0 volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml ports: - "9090:9090" grafana: image: grafana/grafana:10.3.0 depends_on: [prometheus] volumes: - ./monitoring/grafana-dashboards:/etc/grafana/provisioning/dashboards ports: - "3000:3000"volumes: mysql-data: kafka-data: es-data:九、监控与运维
9.1 关键监控指标
| 指标 | 来源 | 告警条件 |
|---|---|---|
| Consumer Lag | Kafka | > 10万条 |
| Schema 版本 | Schema Registry | 不兼容变更 |
| Outbox 堆积 | MySQL | 待发送消息 > 1000 |
| Debezium 延迟 | Kafka Connect | > 5s |
| Streams 处理速率 | Kafka Streams | 突然下降 |
| 订单创建速率 | Order Service | 突增 3x |
9.2 告警规则
groups: - name: order-system rules: - alert: ConsumerLagHigh expr: kafka_consumer_consumer_lag_records_lag_max > 100000 for: 5m labels: severity: warning annotations: summary: "Consumer lag 超过 10 万条" - alert: DebeziumLagHigh expr: debezium_connector_source_lag_seconds > 5 for: 2m labels: severity: critical annotations: summary: "Debezium 延迟超过 5 秒" - alert: SchemaIncompatible expr: schema_registry_compatibility_check_failed > 0 for: 1m labels: severity: critical annotations: summary: "Schema 不兼容变更"十、测试策略
10.1 测试金字塔
10.2 单元测试:聚合根
class OrderAggregateTest { @Test void shouldCreateOrder() { OrderAggregate aggregate = new OrderAggregate(); aggregate.createOrder("ORD-1", "CUST-1", 100.0, List.of("ITEM-1")); assertEquals(OrderStatus.CREATED, aggregate.getStatus()); assertEquals(1, aggregate.getPendingEvents().size()); assertTrue(aggregate.getPendingEvents().get(0) instanceof OrderCreatedEvent); } @Test void shouldRebuildFromEvents() { List<OrderEvent> events = List.of( new OrderCreatedEvent("ORD-1", "CUST-1", 100.0, List.of("ITEM-1")), new OrderPaidEvent("ORD-1", 100.0) ); OrderAggregate aggregate = OrderAggregate.fromEvents(events); assertEquals(OrderStatus.PAID, aggregate.getStatus()); assertEquals(0, aggregate.getPendingEvents().size()); // 重建不产生新事件 } @Test void shouldRejectDuplicatePayment() { OrderAggregate aggregate = new OrderAggregate(); aggregate.createOrder("ORD-1", "CUST-1", 100.0, List.of("ITEM-1")); aggregate.payOrder(100.0); assertThrows(IllegalStateException.class, () -> aggregate.payOrder(100.0)); }}10.3 集成测试:Embedded Kafka
@SpringBootTest@EmbeddedKafka(partitions = 1, topics = {"order-events"})class OrderProjectionTest { @Autowired private EmbeddedKafkaBroker embeddedKafka; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @Test void shouldProjectOrderCreatedEvent() throws Exception { // 发送 ORDER_CREATED 事件 String event = JSON.toJSONString(Map.of( "eventType", "ORDER_CREATED", "orderId", "ORD-1", "customerId", "CUST-1", "amount", 100.0 )); kafkaTemplate.send("order-events", "ORD-1", event).get(); // 等待投影更新 Thread.sleep(2000); // 验证读模型 OrderView view = orderViewRepository.findById("ORD-1").orElseThrow(); assertEquals("CUST-1", view.getCustomerId()); assertEquals(OrderStatus.CREATED, view.getStatus()); }}10.4 端到端测试:Testcontainers
@Testcontainersclass OrderSystemE2ETest { @Container static final KafkaContainer kafka = new KafkaContainer( DockerImageName.parse("confluentinc/cp-kafka:7.6.0") ); @Container static final MySQLContainer<?> mysql = new MySQLContainer<>( DockerImageName.parse("mysql:8.0") ).withCommand("--binlog-format=ROW"); @Test void shouldCompleteOrderFlow() { // 1. 创建订单 → 验证事件发送 Order order = orderService.createOrder(request); await().atMost(5, SECONDS).until(() -> kafkaConsumer.records("order-events").size() >= 1 ); // 2. 支付订单 → 验证库存扣减 orderService.payOrder(order.getId(), order.getAmount()); await().atMost(5, SECONDS).until(() -> inventoryService.getLockedQuantity(order.getId()) > 0 ); // 3. 验证通知发送 await().atMost(5, SECONDS).until(() -> notificationService.getSentCount(order.getId()) >= 1 ); }}10.5 测试策略对比
| 测试类型 | 范围 | 速度 | 数量 | 关键工具 |
|---|---|---|---|---|
| 单元测试 | 聚合根逻辑 | 毫秒级 | 大量 | JUnit 5 |
| 集成测试 | 投影 + 消费者 | 秒级 | 中等 | Embedded Kafka |
| 契约测试 | Schema 兼容性 | 毫秒级 | 每次变更 | Schema Registry API |
| 端到端测试 | 全链路 | 分钟级 | 少量 | Testcontainers |
| 故障注入 | 容错恢复 | 分钟级 | 关键场景 | Chaos Monkey |
事件驱动系统的测试关键是”事件驱动”——不要测 HTTP 接口,测事件流。单元测试验证聚合根的事件产生逻辑,集成测试验证投影的正确性,端到端测试验证全链路事件传递。Schema 兼容性测试应该在 CI 中自动执行。
十一、故障演练
11.1 故障场景与恢复
| 故障场景 | 影响 | 恢复策略 |
|---|---|---|
| Kafka Broker 宕机 | 消息发送暂停 | ISR 自动切换 Leader |
| Schema 不兼容 | 消息发送失败 | 回滚 Schema 或创建新版本 |
| Debezium 停止 | Outbox 堆积 | 重启 Connector,自动恢复 |
| DB 主从切换 | 短暂不可用 | 应用层重试 |
| 消费者 OOM | 消费暂停 | 重启消费者,从上次 Offset 继续 |
| 投影延迟 | 读模型不一致 | 监控 lag,必要时重建投影 |
11.2 投影重建
当读模型与写模型不一致时(如投影 Bug 导致数据错误),需要重建投影:
定期进行故障演练——在非生产环境中模拟各种故障场景,验证系统的恢复能力。特别是:1)Kafka Broker 故障时的 Leader 切换;2)Schema 不兼容时的回滚流程;3)Debezium 故障时的 Outbox 堆积处理;4)投影 Bug 时的重建流程。
十二、总结
上一章理解了消息与数据库一致性。
| 维度 | 关键要点 |
|---|---|
| 系统架构 | Kafka + Schema Registry + Debezium + Kafka Streams |
| Schema 管理 | Avro Schema + Schema Registry,向后兼容策略 |
| 一致性保证 | 事务性发件箱 + Debezium CDC,替代双写 |
| 事件溯源 | 聚合根 + 事件存储,事件作为数据唯一来源 |
| CQRS | 写模型(事件溯源)+ 读模型(投影),最终一致 |
| 流处理 | Kafka Streams 窗口聚合 + 异常检测 |
| 有序性 | orderId 作为 Key,保证同一订单事件有序 |
| 监控 | Consumer Lag + Debezium 延迟 + Schema 版本 |
| 测试 | 单元(聚合根)+ 集成(投影)+ E2E(全链路) |
| 故障演练 | 定期模拟故障,验证恢复能力 |
构建事件驱动系统的核心不是选择哪个消息队列,而是设计好三个关键环节:1)消息与数据库的一致性(发件箱模式);2)消息格式的版本管理(Schema Registry);3)消息的流式处理(Kafka Streams)。这三个环节做好了,任何消息队列都能支撑你的系统。
本系列到此结束。从 消息系统全景 开始,走过了消息语义、Kafka 架构与存储、RabbitMQ 智能路由、RocketMQ 事务消息、Pulsar 分层架构、消息有序性、积压与反压、事件溯源与 CQRS、Schema 演化、消息选型、数据库一致性,最终构建了完整的事件驱动系统。希望这个系列能帮助你深入理解消息系统的设计精髓。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






