mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
1525 字
4 分钟
Kafka Streams 与流处理
2026-03-29

Kafka 是一个消息队列——这句话只说对了一半。从 0.10 版本开始,Kafka 内置了 Kafka Streams 库,它让 Kafka 直接具备流处理能力:实时聚合、窗口计算、流表连接,不需要 Flink 或 Spark。更反直觉的是,Kafka Streams 不是一个独立集群——它只是一个嵌入应用程序的 Java 库,状态存储在本地 RocksDB,容错靠 Kafka 的 compacted topic 做检查点。流表二象性(Stream-Table Duality)是它的核心思想:流是表的变更日志,表是流的物化结果。

一、流处理基础#

1.1 什么是流处理?#

流处理是对无界数据流的持续、实时处理。与批处理不同,流处理的核心特征是:

维度批处理流处理
数据边界有界(固定大小)无界(持续到达)
处理时机积攒后统一处理到达即处理
延迟分钟~小时毫秒~秒
结果产出一次性输出持续输出
容错重新跑批检查点 + 重放
graph LR subgraph "批处理" B1["数据积攒"] --> B2["定时触发"] B2 --> B3["批量计算"] B3 --> B4["输出结果"] end subgraph "流处理" S1["事件到达"] --> S2["实时计算"] S2 --> S3["持续输出"] S3 --> S2 end

1.2 Kafka Streams 的定位#

Kafka Streams 是一个轻量级流处理库(不是独立集群),直接嵌入应用程序中:

特性Kafka StreamsApache FlinkApache Spark Streaming
部署模式嵌入应用独立集群独立集群
依赖仅 Kafka集群 + 资源管理集群 + 资源管理
延迟毫秒级毫秒级秒级(微批)
状态管理RocksDB + KafkaRocksDB/堆内存堆内存
exactly-once支持支持支持
适用规模中小规模大规模大规模
Note

Kafka Streams 不需要独立集群——它只是一个 Java 库。这使得它非常适合中小规模的流处理场景,无需运维额外的集群基础设施。

二、KStream 与 KTable#

2.1 两种核心抽象#

Kafka Streams 提供两种核心数据结构:

抽象语义类比更新行为
KStream事件流数据库的 Insert 日志每条记录都是新事件
KTable物化视图数据库的表相同 Key 的记录是更新
// KStream:每条记录都是独立事件
// 订单事件流:order-1 创建, order-1 支付, order-1 发货
KStream<String, OrderEvent> orderEvents = builder.stream("orders");
// 输出:order-1 → {type: CREATED}
// order-1 → {type: PAID}
// order-1 → {type: SHIPPED}
// KTable:相同 Key 的记录是更新
// 订单最新状态:order-1 的状态不断更新
KTable<String, OrderState> orderStates = builder.table("order-states");
// 输出:order-1 → {status: CREATED}
// order-1 → {status: PAID} (覆盖 CREATED)
// order-1 → {status: SHIPPED} (覆盖 PAID)

2.2 流表二象性#

KStream 和 KTable 可以相互转换,这就是”流表二象性”:

graph LR subgraph "流 → 表" KS["KStream<br/>(事件日志)"] -->|aggregate/groupBy| KT["KTable<br/>(物化视图)"] end subgraph "表 → 流" KT2["KTable<br/>(物化视图)"] -->|toStream| KS2["KStream<br/>(变更日志)"] end KS -.->|"Changelog"| KT2 KT2 -.->|"Snapshot"| KS
转换操作含义
流 → 表aggregate(), count(), reduce()将事件流聚合为当前状态
表 → 流toStream()将状态变更作为事件发出
表 → 流toStream().map()将状态变更转换为其他事件
// 流 → 表:订单金额汇总
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Double> customerTotals = orders
.groupBy((key, order) -> KeyValue.pair(order.getCustomerId(), order.getAmount()))
.aggregate(
() -> 0.0, // 初始值
(customerId, amount, total) -> total + amount, // 累加器
Materialized.with(Serdes.String(), Serdes.Double()) // 状态存储
);
// 表 → 流:金额变更事件
KStream<String, Double> totalChanges = customerTotals.toStream();
totalChanges.to("customer-total-changes");

三、窗口聚合#

3.1 窗口类型#

流处理中的聚合需要定义时间窗口,Kafka Streams 提供四种窗口:

窗口类型特点适用场景是否对齐
Tumbling Window固定大小,不重叠每分钟统计
Hopping Window固定大小,可重叠滑动平均
Sliding Window基于事件时间,可变大小去重、会话
Session Window基于活动间隔用户会话分析
graph TB subgraph "Tumbling Window(翻滚窗口)" T1["[0, 10)"] T2["[10, 20)"] T3["[20, 30)"] end subgraph "Hopping Window(跳跃窗口)" H1["[0, 10)"] H2["[5, 15)"] H3["[10, 20)"] end subgraph "Session Window(会话窗口)" S1["[0, 8)"] S2["[15, 25)"] S3["[30, 35)"] end

3.2 窗口聚合实现#

// 翻滚窗口:每 5 分钟统计订单量
KStream<String, Order> orders = builder.stream("orders");
KTable<Windowed<String>, Long> orderCounts = orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count(Materialized.as("order-counts-store"));
// 跳跃窗口:每 1 分钟滑动,窗口大小 5 分钟
KTable<Windowed<String>, Double> movingAvg = orders
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1)))
.aggregate(
() -> 0.0,
(key, order, aggregate) -> aggregate + order.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// 会话窗口:30 分钟不活跃则关闭会话
KTable<Windowed<String>, Integer> sessionCounts = orders
.groupByKey()
.windowedBy(SessionWindows.with(Duration.ofMinutes(30)))
.aggregate(
() -> 0,
(key, order, count) -> count + 1,
(aggKey, leftAgg, rightAgg) -> leftAgg + rightAgg,
Materialized.with(Serdes.String(), Serdes.Integer())
);

3.3 事件时间与处理时间#

时间语义含义优点缺点
事件时间事件发生的时间结果确定,可重放需要处理迟到数据
处理时间事件被处理的时间简单,低延迟结果不确定
// 使用事件时间(推荐)
// 通过 TimestampExtractor 提取事件中的时间戳
public class OrderTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
Order order = (Order) record.value();
return order.getEventTime(); // 使用事件中的时间戳
}
}
// 配置
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
OrderTimestampExtractor.class.getName());
// 处理迟到数据
// 设置允许的迟到时间
.windowedBy(TimeWindows.of(Duration.ofMinutes(5))
.grace(Duration.ofMinutes(1))) // 允许 1 分钟迟到

四、状态存储#

4.1 状态类型#

Kafka Streams 的状态分为两种:

状态类型存储容量恢复方式
本地状态RocksDB + 内存大(磁盘)Changelog Topic 重放
全局状态RocksDB + 内存Global Table
// 状态存储配置
StoreBuilder<KeyValueStore<String, Long>> countStore = Stores
.keyValueStoreBuilder(
Stores.persistentKeyValueStore("order-counts"), // RocksDB 持久化
Serdes.String(),
Serdes.Long()
)
.withCachingEnabled() // 启用缓存(减少 I/O)
.withLoggingEnabled(); // 启用 Changelog(容错)
builder.addStateStore(countStore);
// 在 Processor 中使用状态存储
public class OrderProcessor implements Processor<String, Order, String, Order> {
private KeyValueStore<String, Long> countStore;
@Override
public void init(ProcessorContext<String, Order> context) {
this.countStore = context.getStateStore("order-counts");
}
@Override
public void process(Record<String, Order> record) {
String customerId = record.value().getCustomerId();
Long count = countStore.get(customerId);
countStore.put(customerId, count == null ? 1 : count + 1);
}
}

4.2 Changelog Topic#

状态存储的容错通过 Changelog Topic 实现:

sequenceDiagram participant App as Streams 应用 participant Store as 本地状态存储<br/>(RocksDB) participant CL as Changelog Topic participant B as Kafka Broker Note over App: 正常运行 App->>Store: 写入状态更新 App->>CL: 发送状态变更到 Changelog CL->>B: 持久化 Note over App: 应用崩溃! Note over Store: 状态丢失 Note over App: 应用重启 App->>B: 从 Changelog Topic 重放 B->>Store: 恢复状态 Note over Store: 状态恢复完成
# 查看 Changelog Topic
kafka-topics --list --bootstrap-server localhost:9092 | grep changelog
# 输出:order-counts-store-changelog
# customer-totals-store-changelog
# Changelog Topic 配置
# cleanup.policy=compact (保留最新值)
# segment.bytes=67108864 (64MB)
# retention.bytes=-1 (无大小限制)

五、拓扑与 DSL#

5.1 处理拓扑#

Kafka Streams 的处理逻辑被建模为有向无环图(DAG):

// 使用 DSL 构建拓扑
StreamsBuilder builder = new StreamsBuilder();
// Source: 从 Kafka 读取
KStream<String, Order> orders = builder.stream("orders");
// Process: 过滤无效订单
KStream<String, Order> validOrders = orders
.filter((key, order) -> order.getAmount() > 0);
// Transform: 按客户分组
KGroupedStream<String, Order> grouped = validOrders
.groupBy((key, order) -> KeyValue.pair(order.getCustomerId(), order));
// Aggregate: 计算客户订单总额
KTable<String, Double> totals = grouped
.aggregate(
() -> 0.0,
(key, order, total) -> total + order.getAmount(),
Materialized.with(Serdes.String(), Serdes.Double())
);
// Sink: 写回 Kafka
totals.toStream().to("customer-totals");
// 查看拓扑描述
Topology topology = builder.build();
System.out.println(topology.describe());
graph LR S["Source:<br/>orders"] --> F["Filter:<br/>amount > 0"] F --> G["GroupBy:<br/>customerId"] G --> A["Aggregate:<br/>sum(amount)"] A --> SK["Sink:<br/>customer-totals"]

5.2 常用 DSL 操作#

操作类型操作说明
无状态filter()过滤记录
无状态map()转换记录
无状态flatMap()一条变多条
无状态branch()按条件分流
有状态groupByKey()按 Key 分组
有状态aggregate()自定义聚合
有状态count()计数
有状态reduce()归约
连接join()KStream-KStream
连接leftJoin()KStream-KTable
// KStream-KTable Join:订单 + 客户信息
KStream<String, Order> orders = builder.stream("orders");
KTable<String, Customer> customers = builder.table("customers");
KStream<String, EnrichedOrder> enriched = orders.join(
customers,
(order, customer) -> new EnrichedOrder(order, customer)
);
// KStream-KStream Join:订单 + 支付
KStream<String, Order> orders = builder.stream("orders");
KStream<String, Payment> payments = builder.stream("payments");
KStream<String, OrderPayment> joined = orders.join(
payments,
(order, payment) -> new OrderPayment(order, payment),
JoinWindows.of(Duration.ofMinutes(5)) // 5 分钟内匹配
);

六、生产部署#

6.1 线程与任务模型#

graph TB subgraph "Kafka Streams 应用" T1["Thread 1"] T2["Thread 2"] subgraph "T1 的任务" TASK1["Task 0<br/>Partition 0"] TASK2["Task 1<br/>Partition 1"] end subgraph "T2 的任务" TASK3["Task 2<br/>Partition 2"] TASK4["Task 3<br/>Partition 3"] end end subgraph "Kafka Topic" P0["Partition 0"] P1["Partition 1"] P2["Partition 2"] P3["Partition 3"] end P0 --> TASK1 P1 --> TASK2 P2 --> TASK3 P3 --> TASK4
概念说明配置
Task分区的处理单元= 输入 Topic 分区数
Thread任务执行线程num.stream.threads
Instance应用实例可水平扩展
# Kafka Streams 配置
# 线程数(每个实例)
num.stream.threads=4
# 状态目录
state.dir=/var/lib/kafka-streams
# exactly-once
processing.guarantee=exactly_once_v2
# 缓存大小
cache.max.bytes.buffering=10485760 # 10MB
# 提交间隔
commit.interval.ms=1000
# 应用 ID(唯一标识)
application.id=order-streams-app

6.2 监控指标#

指标含义告警条件
commit-latency-avg提交延迟> 1s
process-rate处理速率突然下降
commit-rate提交速率过低
task-commit-latency-avg任务提交延迟> 500ms
skipped-records-rate跳过记录率> 0
# 查看 Streams 应用状态
kafka-consumer-groups --bootstrap-server localhost:9092 \
--describe --group order-streams-app
# JMX 监控
# kafka.streams:type=stream-task-metrics,task-id=*
# kafka.streams:type=stream-processor-metrics,processor-id=*
Info

Kafka Streams 的任务数等于输入 Topic 的分区数。如果分区数 > 实例数 × 线程数,部分任务会处于未分配状态。确保实例数 × 线程数 ≥ 分区数。

6.3 交互式查询#

Kafka Streams 提供交互式查询(Interactive Queries)能力,允许外部应用直接查询状态存储中的数据,而无需将结果写回 Kafka 再由消费者读取:

// 启用交互式查询
KTable<String, Double> customerTotals = orders
.groupBy((key, order) -> KeyValue.pair(order.getCustomerId(), order.getAmount()))
.aggregate(
() -> 0.0,
(customerId, amount, total) -> total + amount,
Materialized.as("customer-totals-queryable-store") // 命名存储,可查询
);
// 查询本地状态存储
ReadOnlyKeyValueStore<String, Double> store =
kafkaStreams.store("customer-totals-queryable-store", QueryableStoreTypes.keyValueStore());
// 按 Key 查询
Double total = store.get("customer-123");
// 范围查询
KeyValueIterator<String, Double> range = store.range("customer-100", "customer-200");
查询类型方法说明
点查询store.get(key)查询指定 Key 的值
范围查询store.range(from, to)查询 Key 范围内的所有记录
前缀扫描store.prefixScan(prefix)按前缀扫描(仅 KeyValueStore)
全量遍历store.all()遍历所有记录(慎用)
Note

交互式查询只能查询本地实例的状态存储。如果需要查询的数据在其他实例上,需要通过 StreamsMetadata 定位到正确的实例,再通过 REST API 转发请求。

// 查找 Key 所在的实例
StreamsMetadata metadata = kafkaStreams.streamsMetadataForStore("customer-totals-queryable-store", "customer-123");
String host = metadata.host();
int port = metadata.port();
// 通过 HTTP 转发查询请求到正确的实例
// GET http://{host}:{port}/state/customer-totals-queryable-store/customer-123

6.4 实战案例:实时订单监控#

将前面所有概念组合起来,构建一个实时订单监控系统:

StreamsBuilder builder = new StreamsBuilder();
// 1. 读取订单事件流
KStream<String, OrderEvent> orders = builder.stream("orders");
// 2. 过滤有效订单
KStream<String, OrderEvent> validOrders = orders
.filter((key, event) -> event.getAmount() != null && event.getAmount() > 0);
// 3. 按客户分组,5 分钟翻滚窗口统计
KTable<Windowed<String>, OrderSummary> windowedSummary = validOrders
.groupBy((key, event) -> KeyValue.pair(event.getCustomerId(), event))
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1)))
.aggregate(
OrderSummary::new,
(customerId, event, summary) -> summary.add(event),
Materialized.with(Serdes.String(), new JsonSerde<>(OrderSummary.class))
);
// 4. 检测异常:5 分钟内消费超过阈值
KStream<String, Alert> alerts = windowedSummary
.toStream()
.filter((windowedKey, summary) -> summary.getTotalAmount() > 10000)
.map((windowedKey, summary) -> new KeyValue<>(
windowedKey.key(),
new Alert("HIGH_SPENDING", windowedKey.key(), summary.getTotalAmount())
));
// 5. 输出告警
alerts.to("order-alerts", Produced.with(Serdes.String(), new JsonSerde<>(Alert.class)));
// 6. 物化客户最新状态(可查询)
KTable<String, CustomerStats> customerStats = validOrders
.groupBy((key, event) -> KeyValue.pair(event.getCustomerId(), event))
.aggregate(
CustomerStats::new,
(customerId, event, stats) -> stats.update(event),
Materialized.as("customer-stats-store")
);
graph LR ORDERS["orders<br/>Topic"] --> FILTER["Filter<br/>amount > 0"] FILTER --> GROUP["GroupBy<br/>customerId"] GROUP --> WINDOW["Windowed<br/>5min Tumbling"] WINDOW --> AGG["Aggregate<br/>OrderSummary"] AGG --> ALERT_FILTER["Filter<br/>total > 10000"] ALERT_FILTER --> ALERTS["order-alerts<br/>Topic"] AGG --> STATS["customer-stats-store<br/>(Queryable)"]

七、总结#

维度关键要点
KStream事件流抽象,每条记录都是新事件
KTable物化视图抽象,相同 Key 是更新
流表二象性KStream 和 KTable 可相互转换
窗口Tumbling/Hopping/Session 三种窗口,优先使用事件时间
状态RocksDB + Changelog Topic 实现本地状态容错
部署Task = 分区数,线程数 × 实例数 ≥ 分区数
Tip

Kafka Streams 最适合”Kafka 到 Kafka”的流处理场景——无需额外集群,直接嵌入应用。如果需要复杂计算、多数据源或大规模处理,考虑 Flink 或 Spark Streaming。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Kafka Streams 与流处理
https://blog.souloss.com/posts/messaging/messaging-kafka-streams/
作者
Souloss
发布于
2026-03-29
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时