当你从单机数据库走向分布式系统,数据不再只存在于一张表里——它散落在消息队列、数据仓库、搜索引擎、缓存集群之中。如何高效地搬运、转换、聚合这些数据?批处理和流处理正是回答这个问题的两种范式。
本章从数据处理范式的分类出发,逐层深入:先看批处理如何用 MapReduce 和 Spark 处理海量离线数据,再看流处理如何用 Kafka 和 Flink 实现实时计算,然后讨论 Lambda 与 Kappa 架构的取舍,最后聚焦 CDC、Event Sourcing 与流表二象性——这些正在重新定义”数据库”边界的前沿理念。
前置知识
现代批处理始于 Google MapReduce(2004),Spark(2010)用内存计算提升了一个数量级。流处理方面,Kafka(2011)和 Flink(2015)是主流选择。Kappa 架构正在挑战 Lambda 架构的”批处理+流处理双轨制”。
一、数据处理范式
1.1 三种处理范式
数据处理系统按时效性可分为三类:批处理(Batch)、流处理(Stream)和交互式查询(Interactive)。
| 维度 | 批处理 | 流处理 | 交互式查询 |
|---|---|---|---|
| 数据范围 | 有界数据集(全量) | 无界数据流(增量) | 随机访问(点查/范围) |
| 延迟 | 分钟~小时 | 毫秒~秒 | 毫秒 |
| 典型场景 | ETL、报表、模型训练 | 实时监控、风控、推荐 | OLTP、在线分析 |
| 代表系统 | MapReduce、Spark | Flink、Kafka Streams | MySQL、ClickHouse |
| 数据模型 | 文件/表 | 事件流 | 行/列 |
1.2 范式的边界正在模糊
批处理与流处理的边界正在快速模糊。Spark Structured Streaming 统一了批流 API,Flink 的批处理模式已趋于成熟。现代数据引擎的趋势是统一批流——同一套 API、同一套引擎,只是执行策略不同。
三种范式并非互斥,而是互补。一个完整的数据系统通常同时需要三者:交互式查询服务在线业务,流处理捕获实时变化,批处理完成全量校验和复杂计算。
二、批处理
2.1 MapReduce:开山之作
2004 年 Google 发表 MapReduce 论文,用简单的编程模型解决了海量数据的并行计算问题。其核心思想:将计算推到数据所在节点,而非将数据拉到计算节点。
MapReduce 的执行流程分为 Map、Shuffle、Reduce 三个阶段:
一个经典的 WordCount 示例:
// Mapper:输入 (行号, 文本) → 输出 (单词, 1)public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { for (String token : value.toString().split("\\s+")) { word.set(token); context.write(word, new IntWritable(1)); } }}
// Reducer:输入 (单词, [1,1,...]) → 输出 (单词, 总数)public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}MapReduce 的设计哲学是容错优先:任何节点失败,只需重跑该节点的任务。但代价是极高的 I/O 开销——每个 MapReduce 作业之间必须落盘,Shuffle 阶段需要大量网络传输和磁盘排序。
2.2 MapReduce 的局限
| 问题 | 原因 | 影响 |
|---|---|---|
| 中间结果落盘 | 每个 MR 作业独立,Map 输出必须写磁盘 | 多轮迭代(如机器学习)性能极差 |
| 编程模型僵化 | 只有 Map 和 Reduce 两个原语 | Join、排序等操作需要手动链式组合 |
| 延迟高 | 作业调度 + Shuffle + 落盘 | 端到端延迟分钟级起步 |
| 资源利用率低 | Map 槽位和 Reduce 槽位固定 | Reduce 等待 Map 完成期间资源闲置 |
2.3 Spark:内存计算的突破
Spark 的核心创新是 RDD(Resilient Distributed Dataset)——一个不可变、分区的分布式数据集,支持内存中迭代计算,只在首次计算或丢失时读盘。
from pyspark.sql import SparkSessionfrom pyspark.sql import functions as F
spark = SparkSession.builder \ .appName("UserAnalytics") \ .config("spark.executor.memory", "4g") \ .getOrCreate()
# 读取数据(惰性求值,不立即执行)orders = spark.read.parquet("hdfs:///data/orders/")users = spark.read.parquet("hdfs:///data/users/")
# 转换操作(构建 DAG,仍然惰性)result = ( orders .join(users, orders.user_id == users.id) # Join .filter(F.col("status") == "completed") # 过滤 .groupBy("city") # 分组 .agg( F.sum("amount").alias("total_amount"), F.count("*").alias("order_count"), F.avg("amount").alias("avg_amount") ) .orderBy(F.desc("total_amount")) # 排序)
# 行动操作(触发实际计算)result.show(10)result.write.mode("overwrite").parquet("hdfs:///output/city_stats/")Spark 与 MapReduce 的关键区别:
| 维度 | MapReduce | Spark |
|---|---|---|
| 执行模型 | 每阶段落盘 | 内存迭代(Lineage 血缘恢复) |
| 编程模型 | Map + Reduce | RDD / DataFrame / SQL |
| 迭代性能 | 每轮读写磁盘 | 中间结果缓存在内存 |
| 延迟 | 分钟级 | 秒级(交互式查询) |
| 容错 | 重跑失败任务 | 基于 DAG 血缘重算分区 |
| 生态 | Hadoop 生态 | Spark SQL / Streaming / MLlib / GraphX |
Spark 的内存计算并非银弹。当数据量超过内存容量时,Spark 会触发磁盘溢写(Spill),性能急剧下降。生产环境中合理设置 spark.executor.memory、spark.memory.fraction 和 spark.sql.shuffle.partitions 至关重要。此外,数据倾斜(Skew)会导致少数 Task 成为瓶颈,需要通过 Salting 或 Adaptive Query Execution(AQE)缓解。
三、流处理
3.1 事件流与消息队列
流处理的前提是有一个可靠的事件流基础设施。Kafka 是目前最主流的分布式消息队列,其核心设计理念是分布式提交日志——消息按追加写入(Append-Only),消费者通过偏移量(Offset)自主控制消费进度。
Kafka 的关键特性:
| 特性 | 说明 | 与传统消息队列的区别 |
|---|---|---|
| 持久化 | 消息写入磁盘,可配置保留时间 | RabbitMQ 消费后即删除 |
| 回放 | 消费者可重置 Offset 重新消费 | 传统队列消费后不可回溯 |
| 分区 | Topic 分为多个 Partition,并行读写 | 单队列串行消费 |
| 消费者组 | 同组内消费者分摊 Partition | 广播 vs 分担消费 |
| 高吞吐 | 顺序写 + 零拷贝 + 批量压缩 | 吞吐量可达百万级 QPS |
3.2 Flink:流优先的架构
Flink 的设计哲学是流是基础,批是流的特例。与 Spark Streaming 的微批(Micro-batch)模型不同,Flink 是真正的逐事件处理引擎。
// Flink 实时订单统计StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<Order> source = KafkaSource.<Order>builder() .setBootstrapServers("kafka:9092") .setTopics("orders") .setGroupId("order-stats") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new JsonDeserializationSchema<>(Order.class)) .build();
DataStream<Order> orders = env.fromSource( source, WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((event, ts) -> event.getTimestamp()), "Kafka Source");
// 窗口聚合:每5分钟统计各城市订单金额orders.filter(o -> o.getStatus().equals("completed")) .keyBy(Order::getCity) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .aggregate(new OrderAggregator()) .addSink(new ElasticsearchSink<>());
env.execute("Real-time Order Analytics");3.3 窗口:流处理的核心抽象
流数据是无限的,要得到有意义的结果必须对无限流进行”切割”——这就是窗口(Window)。Flink 提供了四种核心窗口类型:
| 窗口类型 | 定义 | 典型场景 | 示例 |
|---|---|---|---|
| 滚动窗口 | 固定大小,不重叠 | 每分钟统计 | 每小时 UV |
| 滑动窗口 | 固定大小,有重叠 | 滑动平均 | 过去 5 分钟每 1 分钟的移动均值 |
| 会话窗口 | 按活跃度动态划分 | 用户行为分析 | 用户操作间隔超 30 分钟则切分 |
| 全局窗口 | 无边界,需自定义触发器 | 自定义逻辑 | 累计统计 + 定期刷新 |
3.4 时间语义与 Watermark
流处理中最棘手的问题是乱序事件:事件产生的时间(Event Time)与被处理的时间(Processing Time)不一致。Flink 通过 Watermark 机制解决这一问题:
// Watermark 策略:允许5秒的乱序WatermarkStrategy<Order> watermarkStrategy = WatermarkStrategy .<Order>forBoundedOutOfOrderness(Duration.ofSeconds(5)) .withTimestampAssigner((order, recordTimestamp) -> order.getEventTime()) .withIdleness(Duration.ofMinutes(1)); // 空闲分区处理
// 延迟数据处理:侧输出OutputTag<Order> lateDataTag = new OutputTag<Order>("late-data") {};
SingleOutputStreamOperator<CityStats> result = orders .keyBy(Order::getCity) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .allowedLateness(Time.minutes(1)) // 允许1分钟延迟 .sideOutputLateData(lateDataTag) // 超出延迟的数据进入侧输出 .aggregate(new OrderAggregator());
// 单独处理延迟数据DataStream<Order> lateOrders = result.getSideOutput(lateDataTag);lateOrders.addSink(new AlertSink()); // 告警或写入冷存储3.5 Checkpoint:精确一次语义
Flink 通过 Chandy-Lamport 算法 的变体实现分布式快照,保证 Exactly-Once 语义:
- Checkpoint Barrier 注入数据流,随数据流动
- 算子收到 Barrier 后暂缓处理,将当前状态快照写入持久存储
- 所有算子完成快照后,Checkpoint 成功
- 故障时从最近成功的 Checkpoint 恢复
# Flink Checkpoint 配置env.enable_checkpointing(60000) # 每60秒一次Checkpointenv.get_checkpoint_config().set_checkpointing_mode(CheckpointingMode.EXACTLY_ONCE)env.get_checkpoint_config().set_checkpoint_timeout(120000) # 超时2分钟env.get_checkpoint_config().set_min_pause_between_checkpoints(30000) # 最小间隔30秒env.get_checkpoint_config().set_max_concurrent_checkpoints(1) # 最多1个并发Checkpointenv.get_checkpoint_config().set_tolerable_checkpoint_failure_number(3) # 允许3次失败Checkpoint 间隔是吞吐量与恢复速度的权衡。间隔越短,恢复时重算的数据越少,但 Checkpoint 本身的 I/O 开销越大。生产环境通常设置为 30 秒~5 分钟,并开启增量 Checkpoint(RocksDB 状态后端)以降低开销。
四、Lambda 与 Kappa 架构
4.1 Lambda 架构
Nathan Marz 在 2011 年提出 Lambda 架构,核心思想是批处理保证准确性,流处理保证实时性,两者结果合并:
Lambda 架构的优缺点:
| 优点 | 缺点 |
|---|---|
| 批处理层保证最终正确性 | 同一逻辑需要实现两套代码 |
| 速度层提供低延迟近似结果 | 运维两套系统的复杂度 |
| 容错性好(批处理可重跑) | 合并逻辑复杂,调试困难 |
| 数据不可变,可回溯 | 存储成本高(全量 + 增量) |
4.2 Kappa 架构
Jay Kreps(Kafka 作者)提出 Kappa 架构作为 Lambda 的简化:只保留流处理层,通过回放日志实现批处理。
Kappa 架构的关键前提:消息队列必须能长期保留数据(Kafka 的 Log Compaction 或长保留期),以便回放。
| 维度 | Lambda 架构 | Kappa 架构 |
|---|---|---|
| 代码复杂度 | 两套(批+流) | 一套(流) |
| 运维复杂度 | 高(两套系统) | 低(一套系统) |
| 批处理能力 | 原生支持 | 通过回放模拟 |
| 历史重算 | 批处理层自动覆盖 | 需启动新 Job 回放 |
| 适用场景 | 复杂计算、机器学习 | 实时性要求高、逻辑简单 |
| 存储依赖 | HDFS + 实时库 | Kafka 长期保留 |
现代实践趋向于混合方案:日常计算用 Kappa 架构(Flink 统一批流),复杂离线分析仍用 Spark 批处理。Flink 1.12+ 的批流一体模式使得同一套代码可以同时运行在流模式和批模式下,进一步模糊了 Lambda 与 Kappa 的边界。
五、CDC 变更数据捕获
5.1 什么是 CDC
CDC(Change Data Capture)是一种监听数据库变更并将其作为事件流输出的技术。它将数据库的每一次 INSERT、UPDATE、DELETE 操作转化为结构化事件,使下游系统能够实时响应数据变化。
CDC 的核心价值:将数据库从”被轮询的存储”变为”主动推送的事件源”。
5.2 CDC 的实现方式
| 方式 | 原理 | 优点 | 缺点 |
|---|---|---|---|
| 查询模式 | 定期 SELECT 轮询 | 实现简单 | 延迟高、对源库压力大 |
| 日志模式 | 解析 WAL/Binlog | 零侵入、低延迟 | 需要解析日志格式 |
| 触发器模式 | 数据库触发器捕获变更 | 实时性好 | 对源库性能有影响 |
| API 模式 | 数据库原生 CDC 接口 | 最可靠 | 仅部分数据库支持 |
5.3 Debezium:开源 CDC 平台
Debezium 是目前最流行的开源 CDC 平台,基于 Kafka Connect 框架,通过解析数据库日志实现变更捕获:
// Debezium MySQL Connector 核心配置{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.name": "dbserver1", "database.include.list": "inventory", "table.include.list": "inventory.orders,inventory.users", "snapshot.mode": "schema_only" }}Debezium 输出的变更事件包含 before(变更前状态)、after(变更后状态)、source(源库元信息)和 op(操作类型:c=Create, u=Update, d=Delete, r=Read/Snapshot)等字段,完整记录了每一次数据变更的上下文。
CDC 与数据复制的关系:CDC 本质上是数据复制中”基于日志的复制”的延伸——它将数据库内部的 WAL/Binlog 转化为外部可消费的事件流,使得复制不再局限于同构数据库之间。
六、Event Sourcing 与 CQRS
6.1 Event Sourcing:以事件为事实
传统 CRUD 模式只保存数据的当前状态,丢失了所有变更历史。Event Sourcing 反转了这一思路:不存储当前状态,只存储导致状态变化的事件序列——当前状态是事件回放(Projection)的函数。
Event Sourcing 的代码示例:
// 传统 CRUD:只保存最终状态,旧状态丢失public class OrderService { public void updateOrderStatus(Long orderId, String status) { Order order = orderRepository.findById(orderId); order.setStatus(status); orderRepository.save(order); }}
// Event Sourcing:保存事件序列,状态由回放派生public class EventSourcedOrderService { public void handle(CreateOrderCommand cmd) { eventStore.append(cmd.getOrderId(), List.of( new OrderCreatedEvent(cmd.getOrderId(), cmd.getItems(), cmd.getTotal()))); } public void handle(PayOrderCommand cmd) { OrderState state = eventStore.replay(cmd.getOrderId()); if (state.getStatus() != Status.CREATED) throw new IllegalStateException("订单已支付"); eventStore.append(cmd.getOrderId(), List.of( new OrderPaidEvent(cmd.getOrderId(), cmd.getAmount(), cmd.getMethod()))); }}6.2 CQRS:命令与查询分离
CQRS(Command Query Responsibility Segregation)将写入模型和读取模型分离,各自独立优化:
| 维度 | 命令侧(写) | 查询侧(读) |
|---|---|---|
| 数据模型 | Event Store(追加写) | 物化视图(读优化) |
| 一致性 | 强一致(事件不可变) | 最终一致(异步投影) |
| 优化方向 | 写入吞吐 | 查询性能 |
| 存储选择 | Event Store / Kafka | Redis / Elasticsearch / ClickHouse |
| 扩展方式 | 分区(按聚合根) | 复制(多副本读) |
# CQRS 读写分离示例:查询侧从事件流构建读模型class OrderReadModel: def __init__(self, redis_client): self.redis = redis_client
def handle_event(self, event): """消费事件,更新 Redis 读模型""" key = f"order:{event['order_id']}" if event["type"] == "OrderCreated": self.redis.hset(key, mapping={"status": "created", "total": event["total"]}) elif event["type"] == "OrderPaid": self.redis.hset(key, mapping={"status": "paid", "paid_at": event["timestamp"]}) elif event["type"] == "OrderShipped": self.redis.hset(key, mapping={"status": "shipped", "tracking": event["tracking"]})
def get_order(self, order_id): return self.redis.hgetall(f"order:{order_id}")6.3 Event Sourcing 的权衡
| 优点 | 缺点 |
|---|---|
| 完整审计追踪 | 事件模式演化复杂 |
| 时间旅行(回放到任意时刻) | 回放性能随事件量增长下降 |
| 天然支持事件驱动架构 | 学习曲线陡峭 |
| 与 CDC 天然契合 | 最终一致性带来调试困难 |
| 事件不可变,天然审计 | 存储成本(所有事件永久保留) |
Event Sourcing 不是银弹。大多数业务只需要 CRUD + 审计日志就足够了。只有在事件本身是业务核心资产的场景(金融交易、合规审计、协作编辑)中,Event Sourcing 才值得引入。过早采用 Event Sourcing 会带来不必要的复杂度。
七、流表二象性
7.1 什么是流表二象性
流表二象性(Stream-Table Duality)是流处理中最深刻的概念之一:流是表的变更日志,表是流在某个时刻的快照。
这个概念在多个技术中反复出现:
| 技术 | 流 → 表 | 表 → 流 |
|---|---|---|
| Kafka | KTable = 日志压缩后的快照 | KStream = 不断追加的变更事件 |
| Flink | 窗口聚合 = 流折叠为表 | CDC Source = 表变更转为流 |
| 数据库 | 物化视图 = 查询结果的持续维护 | WAL/Binlog = 表变更的事件流 |
| Event Sourcing | 投影 = 事件回放为当前状态 | 命令 = 状态变更产生新事件 |
7.2 Kafka 的 KStream 与 KTable
// Kafka Streams:流表二象性的实际应用StreamsBuilder builder = new StreamsBuilder();
// KStream:每条记录都是独立事件(点击流)KStream<String, ClickEvent> clicks = builder.stream("click-events");
// KTable:每个 key 只保留最新值(用户信息)KTable<String, UserProfile> profiles = builder.table("user-profiles");
// 流表 Join:点击事件关联用户信息KStream<String, EnrichedClick> enriched = clicks.leftJoin( profiles, (click, profile) -> new EnrichedClick(click, profile));
// 流聚合为表:按用户统计点击量KTable<String, Long> clickCounts = clicks .groupBy((key, click) -> click.getUserId()) .count(Materialized.as("click-counts"));
// 表变更转为流:点击量变化事件KStream<String, Long> countChanges = clickCounts.toStream();7.3 动态表与连续查询
Flink SQL 将流表二象性推向了极致——用 SQL 同时表达批处理和流处理:
-- Flink SQL:同一套语法,批流统一CREATE TABLE orders ( order_id BIGINT, user_id BIGINT, amount DECIMAL(10, 2), status STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json');
-- 连续查询:每5分钟统计订单金额SELECT city, TUMBLE_START(event_time, INTERVAL '5' MINUTE) AS window_start, TUMBLE_END(event_time, INTERVAL '5' MINUTE) AS window_end, COUNT(*) AS order_count, SUM(amount) AS total_amountFROM ordersGROUP BY city, TUMBLE(event_time, INTERVAL '5' MINUTE);这段 SQL 在批模式下等价于一次性聚合历史数据,在流模式下则持续消费新事件并增量更新结果——同一套语义,两种执行模式。
7.4 流表二象性的实践意义
流表二象性不仅是理论概念,它直接指导架构设计:
| 传统思维 | 流表思维 | 收益 |
|---|---|---|
| 数据库是唯一的数据源 | 数据库只是流的一个物化视图 | 解耦读写,独立扩展 |
| 应用直连数据库查询 | 应用消费事件流构建本地视图 | 降低数据库压力 |
| 数据同步 = 定时 ETL | 数据同步 = 消费 CDC 流 | 实时性从小时级到秒级 |
| 微服务间通过 API 交互 | 微服务间通过事件流通信 | 松耦合、可回溯 |
八、总结
8.1 核心概念回顾
| 概念 | 一句话总结 | 关键取舍 |
|---|---|---|
| 批处理 | 处理有界数据集,吞吐优先 | 延迟高,但结果精确 |
| 流处理 | 处理无界数据流,延迟优先 | 需要处理乱序和窗口 |
| MapReduce | 简单但 I/O 密集的并行计算 | 已被 Spark 取代 |
| Spark | 内存迭代计算,批流统一 | 微批模型延迟较高 |
| Flink | 真正的流优先引擎 | 生态不如 Spark 成熟 |
| Lambda | 批+流双保险 | 代码和运维双重复杂 |
| Kappa | 只用流,回放替代批 | 依赖日志长期保留 |
| CDC | 数据库变更 → 事件流 | 日志解析的兼容性风险 |
| Event Sourcing | 事件即事实,状态是派生 | 事件演化复杂 |
| CQRS | 读写模型分离优化 | 最终一致性 |
| 流表二象性 | 流是表的变更日志,表是流的快照 | 统一批流的认知基础 |
8.2 技术选型决策树
8.3 从数据库到数据系统
批处理与流处理的核心启示是:数据库不再是孤立的数据孤岛,而是数据流中的一个节点。CDC 将数据库的变更输出为事件流,Event Sourcing 将事件作为第一公民,流表二象性统一了流与表的认知——这些概念正在重新定义”数据库”的边界。
在 数据复制 中,我们看到了日志如何驱动副本间的数据同步;在本章中,同样的日志成为了流处理的数据源。而在 数据库可靠性 中,可以看到这些日志如何成为故障恢复的基石——WAL、Binlog、事件日志,本质上都是同一件事的不同面向:不可变的、有序的、可回放的事件序列。
理解了这一点,你就理解了现代数据系统的核心架构原则:以日志为中心,以事件为驱动,以流表二象性为认知框架。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






