mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
2130 字
6 分钟
Pulsar:分层架构与 BookKeeper
2026-04-14

Kafka 已经统治了消息系统领域十年——为什么还要造一个 Pulsar?答案藏在 Kafka 的一个架构缺陷里:Broker 同时负责计算和存储。这意味着扩容 Broker 必须迁移数据(耗时数小时),Broker 故障恢复需要等待分区重新分配和数据复制。Pulsar 的分层架构把计算(无状态 Broker)和存储(BookKeeper)彻底分离——Broker 扩容秒级完成,因为数据不在 Broker 上;Broker 故障恢复只需把 Topic 重新绑定到新 Broker,零数据迁移。

一、Pulsar 分层架构#

1.1 为什么需要分层?#

传统消息系统(Kafka、RocketMQ)的 Broker 同时负责计算和存储,这带来了几个问题:

问题说明
扩容耦合存储扩容必须迁移数据,耗时且风险高
故障恢复慢Broker 故障需要迁移分区和数据
运维复杂存储和计算需要同步规划

Pulsar 的分层架构将计算(Broker)和存储(BookKeeper)分离:

graph TB subgraph "计算层(无状态)" B1["Broker 1"] B2["Broker 2"] B3["Broker 3"] end subgraph "存储层(有状态)" BK1["Bookie 1"] BK2["Bookie 2"] BK3["Bookie 3"] BK4["Bookie 4"] end P["Producer"] --> B1 C["Consumer"] --> B2 B1 -->|"写入"| BK1 B1 -->|"写入"| BK2 B1 -->|"写入"| BK3 B2 -->|"读取"| BK1 B2 -->|"读取"| BK2 ZK["ZooKeeper<br/>(元数据)"] --- B1 ZK --- B2 ZK --- BK1
组件职责状态
计算层Broker消息路由、协议适配、流量控制无状态
存储层BookKeeper (Bookie)消息持久化、副本复制有状态
协调层ZooKeeper元数据、Leader 选举、服务发现有状态
Note

Pulsar 的分层架构灵感来自”存算分离”的数据库设计——计算节点无状态,可以随时扩缩容;存储节点独立扩展,数据不需要迁移。这使得 Pulsar 天然适合云环境和 Kubernetes 部署。

1.2 与 Kafka 架构对比#

Note

Pulsar 的分层架构是它与传统消息队列最大的区别:计算层(Broker)无状态,可以快速扩缩容;存储层(BookKeeper)有状态但独立扩展。Broker 宕机不影响数据安全——新 Broker 可以直接从 BookKeeper 恢复所有 Topic 的元数据和游标。对比 Kafka 的 Broker 既是计算又是存储,扩容需要迁移 Partition 数据,耗时且影响性能。

维度KafkaPulsar
架构Broker = 计算 + 存储Broker(计算)+ Bookie(存储)
扩容分区迁移(耗时)Broker 无状态,秒级扩容
故障恢复分区 Leader 切换 + 数据同步Broker 切换(无数据迁移)
存储扩展增加 Broker + 迁移分区增加 Bookie(自动均衡)
多租户不支持原生支持(Tenant/Namespace)
消息保留基于时间/大小基于时间/大小/策略

二、BookKeeper 深入#

2.1 BookKeeper 架构#

BookKeeper 是 Pulsar 的存储引擎,基于预写日志(WAL)和 Ledger 抽象:

graph TB subgraph "BookKeeper 集群" BK1["Bookie 1<br/>Ledger L1 (ensemble)"] BK2["Bookie 2<br/>Ledger L1 (ensemble)"] BK3["Bookie 3<br/>Ledger L1 (ensemble)"] end W["Writer<br/>(Pulsar Broker)"] -->|"Entry 1"| BK1 W -->|"Entry 2"| BK2 W -->|"Entry 3"| BK3 W -->|"Entry 4"| BK1 ZK["ZooKeeper"] -->|"Ledger 元数据"| W R["Reader"] -->|"读取 Entry"| BK1 R -->|"读取 Entry"| BK2
概念说明类比
Ledger只追加的日志序列Kafka 的 Partition
EntryLedger 中的一条记录Kafka 的 Record
EnsembleLedger 的 Bookie 集合Kafka 的副本集合
Write Quorum (Wq)每条 Entry 写入的副本数Kafka 的 min.insync.replicas
Ack Quorum (Aq)确认写入需要的 ACK 数Kafka 的 acks

2.2 Ledger 写入流程#

sequenceDiagram participant W as Writer (Broker) participant BK1 as Bookie 1 participant BK2 as Bookie 2 participant BK3 as Bookie 3 participant ZK as ZooKeeper Note over W,ZK: 参数: E=3, Wq=3, Aq=2 W->>BK1: Entry 1 (write) W->>BK2: Entry 1 (write) W->>BK3: Entry 1 (write) BK1-->>W: ACK BK2-->>W: ACK Note over W: Aq=2,收到 2 个 ACK 即可 W->>W: 确认 Entry 1 写入成功 W->>BK2: Entry 2 (write) W->>BK3: Entry 2 (write) W->>BK1: Entry 2 (write) BK2-->>W: ACK BK3-->>W: ACK Note over W: 确认 Entry 2 写入成功
// BookKeeper 客户端写入
BookKeeper bk = new BookKeeper(zkConnectString);
// 创建 Ledger
LedgerHandle ledger = bk.createLedger(
3, // ensembleSize: Bookie 数量
3, // writeQuorumSize: 写入副本数
2, // ackQuorumSize: 确认副本数
BookKeeper.DigestType.CRC32,
"password".getBytes()
);
// 写入 Entry
long entryId = ledger.addEntry("Hello Pulsar".getBytes());
// 关闭 Ledger
ledger.close();

2.3 关键参数关系#

参数关系说明
E (Ensemble)E ≥ WqLedger 分布的 Bookie 数
Wq (Write Quorum)Wq ≥ Aq每条 Entry 的写入副本数
Aq (Ack Quorum)Aq ≥ 1确认写入需要的 ACK 数
容忍故障数E - Wq + (Wq - Aq)可以容忍的 Bookie 故障数
配置示例EWqAq容忍故障写入延迟
高吞吐3212低(1 个 ACK)
均衡3321中(2 个 ACK)
高可靠5432高(3 个 ACK)

三、Topic 与订阅模式#

3.1 Topic 层次结构#

Pulsar 的 Topic 采用层次化命名:

persistent://tenant/namespace/topic
| | | |
| | | └── Topic 名称
| | └── 命名空间(隔离单元)
| └── 租户(多租户隔离)
└── 持久化类型(persistent / non-persistent)
层级说明示例
Tenant租户,资源隔离的最高层级acme-corp
Namespace命名空间,策略配置的单元acme-corp/production
Topic消息主题acme-corp/production/orders
# 创建租户
pulsar-admin tenants create acme-corp \
--admin-roles admin \
--allowed-clusters us-east,us-west
# 创建命名空间
pulsar-admin namespaces create acme-corp/production \
--clusters us-east,us-west \
--retention 7d
# 创建 Topic
pulsar-admin topics create persistent://acme-corp/production/orders \
--partitions 8

3.2 四种订阅模式#

Pulsar 支持四种订阅模式,这是它区别于 Kafka 的重要特性:

订阅模式消费模型类比适用场景
Exclusive单消费者独占Kafka 单消费者顺序处理
Failover主备消费者Kafka 主备高可用
Shared轮询分发Kafka 消费者组高吞吐
Key_Shared按 Key 分发Kafka 分区分配Key 有序 + 并发
graph TB subgraph "Exclusive" T1["Topic"] --> C1["Consumer<br/>(独占)"] end subgraph "Failover" T2["Topic"] --> C2["Consumer A<br/>(Active)"] T2 -.-x C3["Consumer B<br/>(Standby)"] end subgraph "Shared" T3["Topic"] -->|"msg 1"| C4["Consumer A"] T3 -->|"msg 2"| C5["Consumer B"] T3 -->|"msg 3"| C6["Consumer C"] end subgraph "Key_Shared" T4["Topic"] -->|"Key=A"| C7["Consumer A<br/>(Key=A,C)"] T4 -->|"Key=B"| C8["Consumer B<br/>(Key=B,D)"] T4 -->|"Key=C"| C7 T4 -->|"Key=D"| C8 end
// 创建不同订阅模式的消费者
Consumer<byte[]> exclusiveConsumer = client.newConsumer()
.topic("persistent://acme-corp/production/orders")
.subscriptionName("order-processor")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
Consumer<byte[]> sharedConsumer = client.newConsumer()
.topic("persistent://acme-corp/production/orders")
.subscriptionName("order-processor")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<byte[]> keySharedConsumer = client.newConsumer()
.topic("persistent://acme-corp/production/orders")
.subscriptionName("order-processor")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();

3.3 订阅模式对比#

维度ExclusiveFailoverSharedKey_Shared
并发度11(主)NN
有序性全局有序全局有序无序Key 内有序
容错自动切换自动重分配自动重分配
吞吐量
消息确认逐条逐条逐条逐条
Warning

Shared 模式下消息无序——Consumer A 可能先收到 msg2,Consumer B 后收到 msg1。如果需要 Key 级别有序,使用 Key_Shared 模式。

四、Geo 复制#

4.1 Geo 复制架构#

Pulsar 原生支持跨数据中心的 Geo 复制:

graph TB subgraph "us-east 集群" BE1["Broker East"] BKE1["Bookie East"] end subgraph "us-west 集群" BW1["Broker West"] BKW1["Bookie West"] end subgraph "eu-central 集群" BC1["Broker EU"] BKC1["Bookie EU"] end BE1 <-->|"Geo 复制"| BW1 BE1 <-->|"Geo 复制"| BC1 BW1 <-->|"Geo 复制"| BC1 P1["Producer (US)"] --> BE1 P2["Producer (EU)"] --> BC1 C1["Consumer (US)"] --> BE1 C2["Consumer (EU)"] --> BC1
复制模式说明适用场景
同步复制等待所有集群确认金融交易
异步复制本地确认后异步复制日志、事件
双活两个集群同时读写就近访问
# 配置 Geo 复制
# 1. 创建全局命名空间
pulsar-admin namespaces create acme-corp/global-orders \
--clusters us-east,us-west,eu-central
# 2. 设置复制策略
pulsar-admin namespaces set-replication-clusters acme-corp/global-orders \
--clusters us-east,us-west,eu-central
# 3. 查看复制状态
pulsar-admin namespaces get-replication-clusters acme-corp/global-orders

4.2 Geo 复制实现#

// Producer:发送消息到本地集群,自动复制到远程
Producer<byte[]> producer = client.newProducer()
.topic("persistent://acme-corp/global-orders/orders")
.create();
// 消息会自动复制到所有配置的集群
producer.send("Order data".getBytes());
// Consumer:从最近的集群消费
Consumer<byte[]> consumer = client.newConsumer()
.topic("persistent://acme-corp/global-orders/orders")
.subscriptionName("order-processor")
.subscribe();
// 消费者会收到来自所有集群的消息(去重后)
Message<byte[]> msg = consumer.receive();

五、多租户#

5.1 多租户模型#

Pulsar 的多租户是原生设计,从底层就支持资源隔离:

层级隔离维度配置
Tenant认证授权、资源配额pulsar-admin tenants
Namespace策略(保留、限流、复制)pulsar-admin namespaces
Topic消息存储与消费pulsar-admin topics
# 创建租户并设置配额
pulsar-admin tenants create acme-corp \
--admin-roles admin@acme \
--allowed-clusters us-east
# 设置命名空间配额
pulsar-admin namespaces set-quota acme-corp/production \
--max-producers-per-topic 50 \
--max-consumers-per-topic 100 \
--max-consumers-per-subscription 50
# 设置消息保留策略
pulsar-admin namespaces set-retention acme-corp/production \
--size 100G \
--time 7d
# 设置限流
pulsar-admin namespaces set-rate-limit acme-corp/production \
--publish-rate 1000 \
--subscribe-rate 1000

5.2 资源隔离#

# Broker 隔离:指定命名空间只能使用特定 Broker
pulsar-admin namespaces set-bundle-range acme-corp/production \
--broker-affinity-group premium-brokers
# Bookie 隔离:指定命名空间只能使用特定 Bookie
pulsar-admin namespaces set-bookie-affinity-group acme-corp/production \
--bookie-affinity-group premium-bookies \
--primary-bookie-group us-east-premium
隔离类型说明适用场景
Broker 隔离不同租户使用不同 Broker延迟敏感租户
Bookie 隔离不同租户使用不同 Bookie数据合规要求
网络隔离不同租户使用不同网络安全合规

六、Pulsar Functions#

6.1 轻量级流处理#

Pulsar Functions 是内置于 Pulsar 的轻量级流处理框架:

// Pulsar Function:订单金额过滤
public class OrderFilterFunction implements Function<Order, Order> {
@Override
public Order process(Order input, Context context) {
if (input.getAmount() > 1000) {
context.publish("high-value-orders", input);
context.incrCounter("high-value-count", 1);
}
return input;
}
}
# 部署 Function
pulsar-admin functions create \
--name order-filter \
--inputs persistent://acme-corp/production/orders \
--output persistent://acme-corp/production/filtered-orders \
--classname com.example.OrderFilterFunction \
--jar /path/to/function.jar
# 查看状态
pulsar-admin functions status --name order-filter
特性Pulsar FunctionsKafka StreamsFlink
部署内置嵌入应用独立集群
语言Java/Python/GoJavaJava/Python
状态内置(BookKeeper)RocksDBRocksDB
运维简单简单复杂

七、Pulsar vs Kafka 全面对比#

维度PulsarKafka
架构分层(Broker + Bookie)单层(Broker = 计算 + 存储)
扩容Broker 秒级扩容分区迁移,分钟级
订阅模式4 种(Exclusive/Failover/Shared/Key_Shared)1 种(Consumer Group)
多租户原生支持不支持
Geo 复制原生支持MirrorMaker 2
消息保留灵活(分层存储)基于时间/大小
分层存储S3/冷热分离不支持
流处理Pulsar FunctionsKafka Streams
生态成长中成熟
社区ApacheApache

7.1 分层存储#

Pulsar 支持分层存储(Tiered Storage),将冷数据自动迁移到低成本存储:

# 配置分层存储(S3)
pulsar-admin namespaces set-offload-policies acme-corp/production \
--offload-threshold 100MB \
--offload-deletion-lag 2d \
--s3-endpoint s3.amazonaws.com \
--s3-bucket pulsar-offload \
--s3-region us-east-1
存储层延迟成本数据热度
Bookie(热)毫秒最近数据
S3/GCS(冷)百毫秒历史数据

分层存储的核心优势是:Broker 仍然可以透明地读取冷数据,消费者无需关心数据存储在哪一层。当消费者请求的数据在 S3 上时,Broker 会自动从 S3 下载并返回,对上层应用完全透明。这使得 Pulsar 可以用极低的成本保留海量历史数据,同时保持热数据的高性能访问。

Tip

Pulsar 的核心优势是”云原生”——分层架构使得计算和存储可以独立扩展,多租户和 Geo 复制是原生能力。如果你的系统需要多租户、跨区域、弹性伸缩,Pulsar 比 Kafka 更合适。但如果你的场景是高吞吐日志流处理,Kafka 的成熟生态和性能仍然领先。

八、总结#

上一章深入解读了RocketMQ 事务与延迟消息的内部机制。

维度关键要点
分层架构Broker(无状态计算)+ Bookie(有状态存储),存算分离
BookKeeperLedger + Entry + Quorum 写入,保证数据持久化
订阅模式4 种订阅模式,灵活的消费模型
Geo 复制原生跨数据中心复制,支持同步/异步
多租户Tenant → Namespace → Topic 层次化隔离
Pulsar Functions内置轻量级流处理,无需额外集群
Note

Pulsar 的 Broker 和 Bookie 都是 Java 实现,生产环境部署需要 JDK 17+ 和足够的堆内存(Broker 建议 4-8GB,Bookie 建议 2-4GB)。如果你对 JVM 调优不熟悉,运维成本会比 Kafka(也是 JVM)更高。社区正在推进 Go 版本的 Broker(pulsar-broker-go),但尚未生产可用。

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Pulsar:分层架构与 BookKeeper
https://blog.souloss.com/posts/messaging/pulsar/
作者
Souloss
发布于
2026-04-14
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时