Kafka 是一个消息队列——这句话只说对了一半。从 0.10 版本开始,Kafka 内置了 Kafka Streams 库,它让 Kafka 直接具备流处理能力:实时聚合、窗口计算、流表连接,不需要 Flink 或 Spark。更反直觉的是,Kafka Streams 不是一个独立集群——它只是一个嵌入应用程序的 Java 库,状态存储在本地 RocksDB,容错靠 Kafka 的 compacted topic 做检查点。流表二象性(Stream-Table Duality)是它的核心思想:流是表的变更日志,表是流的物化结果。
一、流处理基础
1.1 什么是流处理?
流处理是对无界数据流的持续、实时处理。与批处理不同,流处理的核心特征是:
| 维度 | 批处理 | 流处理 |
|---|---|---|
| 数据边界 | 有界(固定大小) | 无界(持续到达) |
| 处理时机 | 积攒后统一处理 | 到达即处理 |
| 延迟 | 分钟~小时 | 毫秒~秒 |
| 结果产出 | 一次性输出 | 持续输出 |
| 容错 | 重新跑批 | 检查点 + 重放 |
1.2 Kafka Streams 的定位
Kafka Streams 是一个轻量级流处理库(不是独立集群),直接嵌入应用程序中:
| 特性 | Kafka Streams | Apache Flink | Apache Spark Streaming |
|---|---|---|---|
| 部署模式 | 嵌入应用 | 独立集群 | 独立集群 |
| 依赖 | 仅 Kafka | 集群 + 资源管理 | 集群 + 资源管理 |
| 延迟 | 毫秒级 | 毫秒级 | 秒级(微批) |
| 状态管理 | RocksDB + Kafka | RocksDB/堆内存 | 堆内存 |
| exactly-once | 支持 | 支持 | 支持 |
| 适用规模 | 中小规模 | 大规模 | 大规模 |
Kafka Streams 不需要独立集群——它只是一个 Java 库。这使得它非常适合中小规模的流处理场景,无需运维额外的集群基础设施。
二、KStream 与 KTable
2.1 两种核心抽象
Kafka Streams 提供两种核心数据结构:
| 抽象 | 语义 | 类比 | 更新行为 |
|---|---|---|---|
| KStream | 事件流 | 数据库的 Insert 日志 | 每条记录都是新事件 |
| KTable | 物化视图 | 数据库的表 | 相同 Key 的记录是更新 |
// KStream:每条记录都是独立事件// 订单事件流:order-1 创建, order-1 支付, order-1 发货KStream<String, OrderEvent> orderEvents = builder.stream("orders");// 输出:order-1 → {type: CREATED}// order-1 → {type: PAID}// order-1 → {type: SHIPPED}
// KTable:相同 Key 的记录是更新// 订单最新状态:order-1 的状态不断更新KTable<String, OrderState> orderStates = builder.table("order-states");// 输出:order-1 → {status: CREATED}// order-1 → {status: PAID} (覆盖 CREATED)// order-1 → {status: SHIPPED} (覆盖 PAID)2.2 流表二象性
KStream 和 KTable 可以相互转换,这就是”流表二象性”:
| 转换 | 操作 | 含义 |
|---|---|---|
| 流 → 表 | aggregate(), count(), reduce() | 将事件流聚合为当前状态 |
| 表 → 流 | toStream() | 将状态变更作为事件发出 |
| 表 → 流 | toStream().map() | 将状态变更转换为其他事件 |
// 流 → 表:订单金额汇总KStream<String, Order> orders = builder.stream("orders");
KTable<String, Double> customerTotals = orders .groupBy((key, order) -> KeyValue.pair(order.getCustomerId(), order.getAmount())) .aggregate( () -> 0.0, // 初始值 (customerId, amount, total) -> total + amount, // 累加器 Materialized.with(Serdes.String(), Serdes.Double()) // 状态存储 );
// 表 → 流:金额变更事件KStream<String, Double> totalChanges = customerTotals.toStream();totalChanges.to("customer-total-changes");三、窗口聚合
3.1 窗口类型
流处理中的聚合需要定义时间窗口,Kafka Streams 提供四种窗口:
| 窗口类型 | 特点 | 适用场景 | 是否对齐 |
|---|---|---|---|
| Tumbling Window | 固定大小,不重叠 | 每分钟统计 | 是 |
| Hopping Window | 固定大小,可重叠 | 滑动平均 | 是 |
| Sliding Window | 基于事件时间,可变大小 | 去重、会话 | 否 |
| Session Window | 基于活动间隔 | 用户会话分析 | 否 |
3.2 窗口聚合实现
// 翻滚窗口:每 5 分钟统计订单量KStream<String, Order> orders = builder.stream("orders");
KTable<Windowed<String>, Long> orderCounts = orders .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5))) .count(Materialized.as("order-counts-store"));
// 跳跃窗口:每 1 分钟滑动,窗口大小 5 分钟KTable<Windowed<String>, Double> movingAvg = orders .groupByKey() .windowedBy(TimeWindows.of(Duration.ofMinutes(5)) .advanceBy(Duration.ofMinutes(1))) .aggregate( () -> 0.0, (key, order, aggregate) -> aggregate + order.getAmount(), Materialized.with(Serdes.String(), Serdes.Double()) );
// 会话窗口:30 分钟不活跃则关闭会话KTable<Windowed<String>, Integer> sessionCounts = orders .groupByKey() .windowedBy(SessionWindows.with(Duration.ofMinutes(30))) .aggregate( () -> 0, (key, order, count) -> count + 1, (aggKey, leftAgg, rightAgg) -> leftAgg + rightAgg, Materialized.with(Serdes.String(), Serdes.Integer()) );3.3 事件时间与处理时间
| 时间语义 | 含义 | 优点 | 缺点 |
|---|---|---|---|
| 事件时间 | 事件发生的时间 | 结果确定,可重放 | 需要处理迟到数据 |
| 处理时间 | 事件被处理的时间 | 简单,低延迟 | 结果不确定 |
// 使用事件时间(推荐)// 通过 TimestampExtractor 提取事件中的时间戳public class OrderTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord<Object, Object> record, long partitionTime) { Order order = (Order) record.value(); return order.getEventTime(); // 使用事件中的时间戳 }}
// 配置props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, OrderTimestampExtractor.class.getName());
// 处理迟到数据// 设置允许的迟到时间.windowedBy(TimeWindows.of(Duration.ofMinutes(5)) .grace(Duration.ofMinutes(1))) // 允许 1 分钟迟到四、状态存储
4.1 状态类型
Kafka Streams 的状态分为两种:
| 状态类型 | 存储 | 容量 | 恢复方式 |
|---|---|---|---|
| 本地状态 | RocksDB + 内存 | 大(磁盘) | Changelog Topic 重放 |
| 全局状态 | RocksDB + 内存 | 大 | Global Table |
// 状态存储配置StoreBuilder<KeyValueStore<String, Long>> countStore = Stores .keyValueStoreBuilder( Stores.persistentKeyValueStore("order-counts"), // RocksDB 持久化 Serdes.String(), Serdes.Long() ) .withCachingEnabled() // 启用缓存(减少 I/O) .withLoggingEnabled(); // 启用 Changelog(容错)
builder.addStateStore(countStore);
// 在 Processor 中使用状态存储public class OrderProcessor implements Processor<String, Order, String, Order> { private KeyValueStore<String, Long> countStore;
@Override public void init(ProcessorContext<String, Order> context) { this.countStore = context.getStateStore("order-counts"); }
@Override public void process(Record<String, Order> record) { String customerId = record.value().getCustomerId(); Long count = countStore.get(customerId); countStore.put(customerId, count == null ? 1 : count + 1); }}4.2 Changelog Topic
状态存储的容错通过 Changelog Topic 实现:
# 查看 Changelog Topickafka-topics --list --bootstrap-server localhost:9092 | grep changelog# 输出:order-counts-store-changelog# customer-totals-store-changelog
# Changelog Topic 配置# cleanup.policy=compact (保留最新值)# segment.bytes=67108864 (64MB)# retention.bytes=-1 (无大小限制)五、拓扑与 DSL
5.1 处理拓扑
Kafka Streams 的处理逻辑被建模为有向无环图(DAG):
// 使用 DSL 构建拓扑StreamsBuilder builder = new StreamsBuilder();
// Source: 从 Kafka 读取KStream<String, Order> orders = builder.stream("orders");
// Process: 过滤无效订单KStream<String, Order> validOrders = orders .filter((key, order) -> order.getAmount() > 0);
// Transform: 按客户分组KGroupedStream<String, Order> grouped = validOrders .groupBy((key, order) -> KeyValue.pair(order.getCustomerId(), order));
// Aggregate: 计算客户订单总额KTable<String, Double> totals = grouped .aggregate( () -> 0.0, (key, order, total) -> total + order.getAmount(), Materialized.with(Serdes.String(), Serdes.Double()) );
// Sink: 写回 Kafkatotals.toStream().to("customer-totals");
// 查看拓扑描述Topology topology = builder.build();System.out.println(topology.describe());5.2 常用 DSL 操作
| 操作类型 | 操作 | 说明 |
|---|---|---|
| 无状态 | filter() | 过滤记录 |
| 无状态 | map() | 转换记录 |
| 无状态 | flatMap() | 一条变多条 |
| 无状态 | branch() | 按条件分流 |
| 有状态 | groupByKey() | 按 Key 分组 |
| 有状态 | aggregate() | 自定义聚合 |
| 有状态 | count() | 计数 |
| 有状态 | reduce() | 归约 |
| 连接 | join() | KStream-KStream |
| 连接 | leftJoin() | KStream-KTable |
// KStream-KTable Join:订单 + 客户信息KStream<String, Order> orders = builder.stream("orders");KTable<String, Customer> customers = builder.table("customers");
KStream<String, EnrichedOrder> enriched = orders.join( customers, (order, customer) -> new EnrichedOrder(order, customer));
// KStream-KStream Join:订单 + 支付KStream<String, Order> orders = builder.stream("orders");KStream<String, Payment> payments = builder.stream("payments");
KStream<String, OrderPayment> joined = orders.join( payments, (order, payment) -> new OrderPayment(order, payment), JoinWindows.of(Duration.ofMinutes(5)) // 5 分钟内匹配);六、生产部署
6.1 线程与任务模型
| 概念 | 说明 | 配置 |
|---|---|---|
| Task | 分区的处理单元 | = 输入 Topic 分区数 |
| Thread | 任务执行线程 | num.stream.threads |
| Instance | 应用实例 | 可水平扩展 |
# Kafka Streams 配置# 线程数(每个实例)num.stream.threads=4
# 状态目录state.dir=/var/lib/kafka-streams
# exactly-onceprocessing.guarantee=exactly_once_v2
# 缓存大小cache.max.bytes.buffering=10485760 # 10MB
# 提交间隔commit.interval.ms=1000
# 应用 ID(唯一标识)application.id=order-streams-app6.2 监控指标
| 指标 | 含义 | 告警条件 |
|---|---|---|
commit-latency-avg | 提交延迟 | > 1s |
process-rate | 处理速率 | 突然下降 |
commit-rate | 提交速率 | 过低 |
task-commit-latency-avg | 任务提交延迟 | > 500ms |
skipped-records-rate | 跳过记录率 | > 0 |
# 查看 Streams 应用状态kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group order-streams-app
# JMX 监控# kafka.streams:type=stream-task-metrics,task-id=*# kafka.streams:type=stream-processor-metrics,processor-id=*Kafka Streams 的任务数等于输入 Topic 的分区数。如果分区数 > 实例数 × 线程数,部分任务会处于未分配状态。确保实例数 × 线程数 ≥ 分区数。
6.3 交互式查询
Kafka Streams 提供交互式查询(Interactive Queries)能力,允许外部应用直接查询状态存储中的数据,而无需将结果写回 Kafka 再由消费者读取:
// 启用交互式查询KTable<String, Double> customerTotals = orders .groupBy((key, order) -> KeyValue.pair(order.getCustomerId(), order.getAmount())) .aggregate( () -> 0.0, (customerId, amount, total) -> total + amount, Materialized.as("customer-totals-queryable-store") // 命名存储,可查询 );
// 查询本地状态存储ReadOnlyKeyValueStore<String, Double> store = kafkaStreams.store("customer-totals-queryable-store", QueryableStoreTypes.keyValueStore());
// 按 Key 查询Double total = store.get("customer-123");
// 范围查询KeyValueIterator<String, Double> range = store.range("customer-100", "customer-200");| 查询类型 | 方法 | 说明 |
|---|---|---|
| 点查询 | store.get(key) | 查询指定 Key 的值 |
| 范围查询 | store.range(from, to) | 查询 Key 范围内的所有记录 |
| 前缀扫描 | store.prefixScan(prefix) | 按前缀扫描(仅 KeyValueStore) |
| 全量遍历 | store.all() | 遍历所有记录(慎用) |
交互式查询只能查询本地实例的状态存储。如果需要查询的数据在其他实例上,需要通过 StreamsMetadata 定位到正确的实例,再通过 REST API 转发请求。
// 查找 Key 所在的实例StreamsMetadata metadata = kafkaStreams.streamsMetadataForStore("customer-totals-queryable-store", "customer-123");String host = metadata.host();int port = metadata.port();
// 通过 HTTP 转发查询请求到正确的实例// GET http://{host}:{port}/state/customer-totals-queryable-store/customer-1236.4 实战案例:实时订单监控
将前面所有概念组合起来,构建一个实时订单监控系统:
StreamsBuilder builder = new StreamsBuilder();
// 1. 读取订单事件流KStream<String, OrderEvent> orders = builder.stream("orders");
// 2. 过滤有效订单KStream<String, OrderEvent> validOrders = orders .filter((key, event) -> event.getAmount() != null && event.getAmount() > 0);
// 3. 按客户分组,5 分钟翻滚窗口统计KTable<Windowed<String>, OrderSummary> windowedSummary = validOrders .groupBy((key, event) -> KeyValue.pair(event.getCustomerId(), event)) .windowedBy(TimeWindows.of(Duration.ofMinutes(5)).grace(Duration.ofMinutes(1))) .aggregate( OrderSummary::new, (customerId, event, summary) -> summary.add(event), Materialized.with(Serdes.String(), new JsonSerde<>(OrderSummary.class)) );
// 4. 检测异常:5 分钟内消费超过阈值KStream<String, Alert> alerts = windowedSummary .toStream() .filter((windowedKey, summary) -> summary.getTotalAmount() > 10000) .map((windowedKey, summary) -> new KeyValue<>( windowedKey.key(), new Alert("HIGH_SPENDING", windowedKey.key(), summary.getTotalAmount()) ));
// 5. 输出告警alerts.to("order-alerts", Produced.with(Serdes.String(), new JsonSerde<>(Alert.class)));
// 6. 物化客户最新状态(可查询)KTable<String, CustomerStats> customerStats = validOrders .groupBy((key, event) -> KeyValue.pair(event.getCustomerId(), event)) .aggregate( CustomerStats::new, (customerId, event, stats) -> stats.update(event), Materialized.as("customer-stats-store") );七、总结
| 维度 | 关键要点 |
|---|---|
| KStream | 事件流抽象,每条记录都是新事件 |
| KTable | 物化视图抽象,相同 Key 是更新 |
| 流表二象性 | KStream 和 KTable 可相互转换 |
| 窗口 | Tumbling/Hopping/Session 三种窗口,优先使用事件时间 |
| 状态 | RocksDB + Changelog Topic 实现本地状态容错 |
| 部署 | Task = 分区数,线程数 × 实例数 ≥ 分区数 |
Kafka Streams 最适合”Kafka 到 Kafka”的流处理场景——无需额外集群,直接嵌入应用。如果需要复杂计算、多数据源或大规模处理,考虑 Flink 或 Spark Streaming。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






