LinkedIn 每天处理超过 7 万亿条消息——这个数字不是靠堆硬件撑起来的,而是靠架构设计。Kafka 用**分区(Partition)把吞吐量拆分到多台 Broker 上并行处理,用副本(Replica)保证单机故障不丢数据,用消费者组(Consumer Group)**让消费能力随节点数线性扩展。这三个概念构成了 Kafka 架构的铁三角,理解它们的协作方式,就理解了 Kafka 为什么能做到高吞吐与高可用兼得。
一、Kafka 架构总览
1.1 核心概念
Kafka 是一个分布式事件流平台,它的核心设计目标是:高吞吐、低延迟、持久化、水平可扩展。理解 Kafka 架构的关键在于把握三个层次:
| 层次 | 核心概念 | 作用 |
|---|---|---|
| 存储层 | Topic / Partition / Segment | 数据如何组织与存储 |
| 复制层 | Replica / ISR / Leader | 数据如何冗余与容错 |
| 消费层 | Consumer Group / Offset | 数据如何被消费与追踪 |
1.2 Broker 的角色
Broker 是 Kafka 集群中的一个节点,它的核心职责包括:
- 存储服务:每个 Broker 存储一部分 Partition 的数据
- 协调服务:Controller Broker 负责分区 Leader 选举、副本分配
- 客户端服务:处理生产者发送请求和消费者拉取请求
# 查看 Broker 信息kafka-broker-api-versions --bootstrap-server localhost:9092
# 查看 Controllerkafka-metadata --bootstrap-server localhost:9092 --describe
# Broker 关键配置# server.propertiesbroker.id=1listeners=PLAINTEXT://:9092log.dirs=/var/lib/kafka/datanum.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400Kafka 集群中只有一个 Controller Broker。Controller 负责管理分区状态、副本分配和 Leader 选举。如果 Controller 宕机,ZooKeeper/KRaft 会自动选举新的 Controller。
1.3 Topic 与 Partition
Topic 是逻辑上的消息分类,Partition 是物理上的数据分片。一个 Topic 被分成多个 Partition,分布在不同 Broker 上:
| 概念 | 说明 | 类比 |
|---|---|---|
| Topic | 消息的逻辑分类 | 数据库中的表 |
| Partition | Topic 的物理分片 | 数据库中的分片 |
| Offset | Partition 内消息的唯一标识 | 自增主键 |
| Segment | Partition 的物理文件 | WAL 日志段 |
// 创建 Topic:3 个分区,3 个副本Properties props = new Properties();props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) { NewTopic topic = new NewTopic("orders", 3, (short) 3); admin.createTopics(Collections.singletonList(topic)).all().get();}
// 查看分区详情DescribeTopicsResult result = admin.describeTopics( Collections.singletonList("orders"));result.topicNameValues().get("orders").get().partitions() .forEach(p -> { System.out.printf("Partition %d: Leader=%d, Replicas=%s, ISR=%s%n", p.partition(), p.leader().id(), p.replicas(), p.isr()); });二、分区机制深入
2.1 分区策略
生产者发送消息时,需要决定消息进入哪个分区。Kafka 提供了三种分区策略:
| 策略 | 实现方式 | 适用场景 | 顺序保证 |
|---|---|---|---|
| 指定分区 | ProducerRecord(topic, partition, ...) | 需要精确控制 | 分区内有序 |
| Key Hash | hash(key) % numPartitions | 需要相同 Key 进入同一分区 | Key 内有序 |
| 轮询(Round-Robin) | StickyPartitioner(默认) | 无 Key,均匀分布 | 无序 |
// 分区策略示例// 1. 指定分区ProducerRecord<String, String> record1 = new ProducerRecord<>("orders", 0, "key1", "value1");
// 2. Key Hash 分区ProducerRecord<String, String> record2 = new ProducerRecord<>("orders", "orderId-123", "value2");// Kafka 会计算 hash("orderId-123") % numPartitions
// 3. 无 Key 轮询ProducerRecord<String, String> record3 = new ProducerRecord<>("orders", "value3");// StickyPartitioner 会批量发送到同一分区,减少请求数
// 自定义分区器public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 按订单类型分区:普通订单 → 0-2,VIP 订单 → 3-5 String orderType = extractOrderType(value.toString()); int numPartitions = cluster.partitionCountForTopic(topic); if ("VIP".equals(orderType)) { return 3 + Math.abs(key.hashCode() % 3); } return Math.abs(key.hashCode() % 3); }}2.2 分区数量选择
分区数量直接影响并行度和吞吐量,但不是越多越好:
| 因素 | 分区少 | 分区多 |
|---|---|---|
| 吞吐量 | 受限(并行度低) | 高(并行度高) |
| 顺序保证 | 范围大 | 范围小(仅分区内有序) |
| 文件句柄 | 少 | 多(每个分区多个文件) |
| 故障恢复 | 快 | 慢(更多 Leader 选举) |
| 内存开销 | 低 | 高(每个分区缓冲区) |
分区数量一旦增加就不能减少。增加分区会改变 Key 到分区的映射关系,导致相同 Key 的消息可能分布到不同分区,破坏顺序性。建议初始分区数按预期峰值设计,而非从少量开始逐步增加。
# 增加分区(只能增加,不能减少)kafka-topics --alter --bootstrap-server localhost:9092 \ --topic orders --partitions 6
# 分区数估算公式# 目标吞吐量 / 单分区吞吐量 = 所需分区数# 例:目标 100MB/s,单分区 10MB/s → 10 个分区
# 单分区吞吐量基准测试kafka-producer-perf-test --topic test \ --num-records 1000000 \ --record-size 1024 \ --throughput -1 \ --producer-props bootstrap.servers=localhost:90922.3 分区与 Key 的关系
三、副本机制与 ISR
3.1 副本模型
Kafka 通过副本(Replica)实现高可用。每个 Partition 有多个副本,分布在不同 Broker 上:
| 副本角色 | 职责 | 数量 |
|---|---|---|
| Leader Replica | 处理所有读写请求 | 每个分区 1 个 |
| Follower Replica | 从 Leader 拉取数据,不处理客户端请求 | 每个分区 N-1 个 |
| Preferred Leader | 优先选举为 Leader 的副本 | 配置指定 |
// 副本分配策略// 手动指定副本分布Map<Integer, List<Integer>> assignment = new HashMap<>();// Partition 0: Leader=0, Followers=1,2assignment.put(0, Arrays.asList(0, 1, 2));// Partition 1: Leader=1, Followers=2,0assignment.put(1, Arrays.asList(1, 2, 0));// Partition 2: Leader=2, Followers=0,1assignment.put(2, Arrays.asList(2, 0, 1));
admin.createPartitions( new CreatePartitionsOptions()).call(); // 使用 NewPartitions.increaseTo() 增加分区3.2 ISR(In-Sync Replicas)
ISR 是 Kafka 副本机制的核心概念。ISR 包含所有与 Leader 保持同步的副本集合:
ISR 的关键参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
replica.lag.time.max.ms | 10000 | Follower 落后超过此时间则移出 ISR |
min.insync.replicas | 1 | ISR 中最少副本数,低于此值拒绝写入 |
unclean.leader.election.enable | false | 是否允许非 ISR 副本成为 Leader |
# 查看 ISR 状态kafka-topics --describe --bootstrap-server localhost:9092 \ --topic orders
# 输出示例:# Topic: orders PartitionCount: 3 ReplicationFactor: 3# Topic: orders Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2# Topic: orders Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1# Topic: orders Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1
# 关键配置# 生产者 ACK + min.insync.replicas 配合保证可靠性# acks=all + min.insync.replicas=2 → 至少 2 个副本确认才返回成功3.3 Leader 选举与 Epoch
当 Leader 宕机时,Kafka 需要从 ISR 中选举新的 Leader。这个过程涉及多个 Epoch 概念:
| Epoch 类型 | 作用 | 递增时机 |
|---|---|---|
| Controller Epoch | 标识 Controller 代次 | Controller 变更 |
| Leader Epoch | 标识分区 Leader 代次 | Leader 切换 |
| Start Offset | 新 Leader 开始的 Offset | Leader 切换 |
// Leader 选举过程(简化版)// 1. Controller 检测到 Leader 宕机// 2. 从 ISR 中选择第一个存活的副本作为新 Leader// 3. 更新 ZooKeeper/KRaft 中的元数据// 4. 通知所有 Broker 新的 Leader 信息
// 使用 AdminClient 查看 Leader EpochDescribeTopicsResult desc = admin.describeTopics( Collections.singletonList("orders"));// LeaderAndIsr 包含:leader, leaderEpoch, isr, zkVersionKafka 使用 Leader Epoch 而非 High Watermark 来解决数据不一致问题。在旧版本中,Follower 重启后会截断到 High Watermark 位置,可能导致已提交的消息丢失。Leader Epoch 机制让 Follower 只截断到旧 Leader 的最后一个 Offset,避免了这个问题。
3.4 副本同步流程
四、消费者组深入
4.1 消费者组模型
消费者组是 Kafka 实现消费水平扩展的核心机制。同一个组内的消费者共同消费一个 Topic,每个分区只被组内一个消费者消费:
| 场景 | 消费者数 | 分区数 | 每个消费者消费的分区数 |
|---|---|---|---|
| 消费者 < 分区 | 2 | 6 | 3 |
| 消费者 = 分区 | 6 | 6 | 1 |
| 消费者 > 分区 | 8 | 6 | 6 个消费者各 1 个,2 个空闲 |
// 消费者组配置Properties props = new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-service");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Collections.singletonList("orders"));4.2 分区分配策略
当消费者组内成员变化时,需要重新分配分区。Kafka 提供了三种分配策略:
| 策略 | 算法 | 特点 | 适用场景 |
|---|---|---|---|
| RangeAssignor | 按分区范围均分 | 可能不均匀 | 默认策略 |
| RoundRobinAssignor | 轮询分配 | 均匀 | 多 Topic 场景 |
| StickyAssignor | 尽量保持原有分配 | 最少迁移 | 减少重平衡开销 |
| CooperativeStickyAssignor | 增量式重平衡 | 不停消费 | Kafka 2.4+ |
// RangeAssignor 示例// Topic: orders, 7 个分区, 3 个消费者// Consumer 0: Partition 0, 1, 2 (3个)// Consumer 1: Partition 3, 4 (2个)// Consumer 2: Partition 5, 6 (2个)
// RoundRobinAssignor 示例// Topic: orders, 7 个分区, 3 个消费者// Consumer 0: Partition 0, 3, 6 (3个)// Consumer 1: Partition 1, 4 (2个)// Consumer 2: Partition 2, 5 (2个)
// 设置分配策略props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StickyAssignor.class.getName());
// 使用 CooperativeStickyAssignor(推荐)props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, CooperativeStickyAssignor.class.getName());4.3 Rebalance 机制
Rebalance 是消费者组内分区重新分配的过程,触发条件包括:
Rebalance 的关键参数:
| 参数 | 默认值 | 说明 |
|---|---|---|
session.timeout.ms | 45000 | 心跳超时,超过则认为消费者死亡 |
heartbeat.interval.ms | 3000 | 心跳发送间隔 |
max.poll.interval.ms | 300000 | 两次 poll 最大间隔 |
group.initial.rebalance.delay.ms | 3000 | 首次 Rebalance 延迟等待 |
# 查看消费者组状态kafka-consumer-groups --bootstrap-server localhost:9092 \ --describe --group order-service
# 输出示例:# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG# order-service orders 0 1000 1050 50# order-service orders 1 980 1020 40# order-service orders 2 950 1000 50
# 重置消费者组 Offsetkafka-consumer-groups --bootstrap-server localhost:9092 \ --group order-service --topic orders \ --reset-offsets --to-earliest --executeRebalance 期间消费者无法消费消息(Eager 模式),频繁 Rebalance 会导致消费延迟。常见原因:消费者处理时间超过 max.poll.interval.ms、GC 停顿、网络抖动。建议使用 CooperativeStickyAssignor 并调大 max.poll.interval.ms。
4.4 Offset 管理
Offset 记录了消费者组在每个分区上的消费进度:
// 手动提交 Offsetwhile (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) { // 处理消息 processOrder(record.value()); }
// 同步提交(阻塞等待确认) consumer.commitSync();
// 或异步提交(不等待确认) consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { log.error("提交 Offset 失败", exception); } } });}
// 精确提交 Offset(处理完每条消息后)Map<TopicPartition, OffsetAndMetadata> commitMap = new HashMap<>();for (ConsumerRecord<String, String> record : records) { processOrder(record.value()); commitMap.put( new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1) );}consumer.commitSync(commitMap);五、Controller 与协调服务
5.1 Controller 职责
Controller 是 Kafka 集群的大脑,负责管理集群元数据和协调操作:
| 职责 | 说明 |
|---|---|
| 分区 Leader 选举 | Broker 宕机时选举新 Leader |
| 副本分配 | 新建 Topic 时分配副本到 Broker |
| 分区重分配 | 扩容时迁移分区数据 |
| Preferred Leader 选举 | 恢复原始 Leader 分配 |
| Topic 管理 | 创建/删除 Topic |
| ISR 管理 | 更新 ISR 集合 |
# Controller 相关操作# 查看当前 Controllerkafka-metadata --bootstrap-server localhost:9092 --describe
# 手动触发 Preferred Leader 选举kafka-leader-election --bootstrap-server localhost:9092 \ --election-type preferred \ --topic orders --partition 0
# 分区重分配cat > reassignment.json << 'EOF'{ "partitions": [ {"topic": "orders", "partition": 0, "replicas": [3, 1, 2]}, {"topic": "orders", "partition": 1, "replicas": [1, 2, 3]} ]}EOF
kafka-reassign-partitions --bootstrap-server localhost:9092 \ --reassignment-json-file reassignment.json \ --execute5.2 KRaft 模式(ZooKeeper 替代)
Kafka 2.8+ 引入 KRaft 模式,不再依赖 ZooKeeper:
| 维度 | ZooKeeper 模式 | KRaft 模式 |
|---|---|---|
| 元数据存储 | ZooKeeper 集群 | 内置 Raft 共识 |
| 运维复杂度 | 两套集群 | 一套集群 |
| Controller 选举 | ZooKeeper 临时节点 | Raft 协议 |
| 分区限制 | ~200,000 | 数百万 |
| 元数据传播 | 逐个 Broker 更新 | 批量广播 |
# KRaft 模式初始化kafka-storage format -t $(uuidgen) -c server.properties
# KRaft 模式启动kafka-server-start.sh server.properties
# KRaft 元数据查询kafka-metadata-quorum --bootstrap-server localhost:9092 describe --status六、生产环境最佳实践
6.1 分区数规划
| 场景 | 推荐分区数 | 依据 |
|---|---|---|
| 低流量日志收集 | 6-12 | 单分区 10MB/s 足够 |
| 中等流量业务事件 | 12-24 | 考虑消费者并行度 |
| 高流量交易数据 | 24-72 | 需要高吞吐 |
| 超大规模数据管道 | 100+ | 需要仔细评估 |
6.2 副本配置建议
# 生产环境推荐配置# 副本数 = 3(平衡可靠性与成本)# min.insync.replicas = 2(允许 1 个副本故障)# acks = all(确保消息写入所有 ISR 副本)
# Broker 配置default.replication.factor=3min.insync.replicas=2unclean.leader.election.enable=false
# Producer 配置acks=allretries=2147483647max.in.flight.requests.per.connection=5enable.idempotence=true
# Consumer 配置enable.auto.commit=falseauto.offset.reset=earliestmax.poll.records=500max.poll.interval.ms=300000session.timeout.ms=300006.3 监控关键指标
| 指标 | 含义 | 告警阈值 |
|---|---|---|
UnderReplicatedPartitions | ISR < AR 的分区数 | > 0 |
OfflinePartitionsCount | 无 Leader 的分区数 | > 0 |
ActiveControllerCount | 活跃 Controller 数 | ≠ 1 |
ConsumerLag | 消费者落后量 | > 10000 |
RequestHandlerAvgIdlePercent | 请求处理器空闲率 | < 0.3 |
// 使用 Kafka Metrics API 监控Map<MetricName, ? extends Metric> metrics = kafkaConsumer.metrics();metrics.forEach((name, metric) -> { if (name.name().equals("records-lag-max")) { System.out.printf("Consumer Lag: %s%n", metric.metricValue()); }});
// JMX 监控关键 MBean// kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions// kafka.controller:type=KafkaController,name=ActiveControllerCount// kafka.consumer:type=consumer-fetch-manager-metrics,client-id=*使用 kafka-consumer-groups --describe 定期检查 Consumer Lag。如果 Lag 持续增长,说明消费速度跟不上生产速度,需要增加消费者或优化消费逻辑。
七、架构对比总结
| 维度 | Kafka | RabbitMQ | RocketMQ | Pulsar |
|---|---|---|---|---|
| 数据模型 | Topic/Partition/Offset | Exchange/Queue/Binding | Topic/Queue/Tag | Topic/Subscription/Cursor |
| 消费模型 | 拉取(Pull) | 推送(Push) | 推拉结合 | 拉取(Pull) |
| 分区/队列 | Partition | Queue | MessageQueue | Segment |
| 消费者组 | Consumer Group | 无原生支持 | Consumer Group | Subscription |
| 副本机制 | ISR | 镜像队列 | Dledger | BookKeeper |
| 顺序保证 | 分区内有序 | 队列有序 | 队列有序 | 分区内有序 |
八、总结
上一章建立了消息语义与可靠性的认知框架。
| 维度 | 关键要点 |
|---|---|
| 分区 | 分区是并行度和吞吐量的基础,分区数需提前规划,只能增不能减 |
| 副本 | 副本保证高可用,ISR 机制确保数据一致性,min.insync.replicas + acks=all 是可靠性保证 |
| 消费者组 | 消费者组实现消费水平扩展,Rebalance 是关键挑战,推荐 CooperativeStickyAssignor |
| Controller | Controller 是集群大脑,KRaft 模式替代 ZooKeeper 降低运维复杂度 |
| 监控 | UnderReplicatedPartitions、ConsumerLag 是最关键的监控指标 |
Kafka 架构的核心思想是”分区即并行”——通过分区实现水平扩展,通过副本实现高可用,通过消费者组实现消费扩展。理解这三者的关系,就理解了 Kafka 的设计哲学。
支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时






