Apache Pulsar — 云原生消息流
架构设计
Pulsar 最大的创新是存算分离架构,将消息路由(Broker)与消息存储(BookKeeper)彻底解耦。
Producer ──► Pulsar Broker(无状态)──► Apache BookKeeper(有状态存储)
↑ ↑
ZooKeeper(元数据) Bookie 节点集群
Consumer ──► Pulsar Broker ──► 从 BookKeeper 读取核心组件
| 组件 | 职责 |
|---|---|
| Broker | 无状态消息路由,负责 Producer/Consumer 连接管理 |
| BookKeeper | 分布式日志存储,Pulsar 的持久化层 |
| Bookie | BookKeeper 的存储节点 |
| ZooKeeper | 存储集群元数据、Broker 注册信息 |
| Pulsar Proxy | 可选,统一入口代理 |
存算分离的优势
传统 Kafka 架构:
Broker = 计算 + 存储(本地磁盘)
扩容时:必须同时扩计算和存储
数据迁移:Partition Rebalance 耗时长
Pulsar 架构:
Broker(无状态)← 独立水平扩展,秒级
BookKeeper(存储)← 独立扩展,无需数据迁移
Broker 宕机:立即切换到其他 Broker,无数据丢失多租户与命名空间
Pulsar 原生支持多租户,是其区别于 Kafka 的重要特性:
Pulsar 命名层级:
Tenant(租户)
└── Namespace(命名空间)
└── Topic(主题)
示例:
persistent://finance-team/payments/order-events
│ │ │ │
协议 租户 命名空间 Topic 名bash
# 创建租户
pulsar-admin tenants create finance-team \
--admin-roles admin \
--allowed-clusters us-east,us-west
# 创建命名空间
pulsar-admin namespaces create finance-team/payments \
--clusters us-east,us-west
# 设置命名空间策略
pulsar-admin namespaces set-retention finance-team/payments \
--size 10G --time 7d订阅模式
Pulsar 支持四种订阅模式,灵活覆盖各种消费场景:
Exclusive(独占)
Topic ──► Subscription ──► 单个 Consumer(独占)只允许一个消费者,适合有序处理。
Shared(共享)
Topic ──► Subscription ──► Consumer A(消息1,3,5)
──► Consumer B(消息2,4,6)多消费者轮询分发,提高吞吐,但不保证顺序。
Failover(故障转移)
Topic ──► Subscription ──► Consumer A(主消费者)
──► Consumer B(备用,A 故障时接管)Key_Shared(Key 共享)
Topic ──► Subscription ──► Consumer A(key=user-1 的所有消息)
──► Consumer B(key=user-2 的所有消息)相同 Key 的消息路由到同一消费者,保证 Key 级别的顺序性。
Java 客户端示例
java
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
// 生产者
Producer<String> producer = client.newProducer(Schema.STRING)
.topic("persistent://public/default/my-topic")
.compressionType(CompressionType.LZ4)
.sendTimeout(5, TimeUnit.SECONDS)
.create();
producer.newMessage()
.key("user-123") // 用于 Key_Shared 路由
.value("Hello Pulsar")
.property("source", "order-service")
.send();
// 消费者(Shared 模式)
Consumer<String> consumer = client.newConsumer(Schema.STRING)
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.messageListener((c, msg) -> {
System.out.println(msg.getValue());
c.acknowledgeAsync(msg);
})
.subscribe();Schema Registry
Pulsar 内置 Schema Registry,强制消息格式约束:
java
// 定义 Avro Schema
@Data
public class OrderEvent {
private String orderId;
private double amount;
private long timestamp;
}
// 使用 Schema 发送
Producer<OrderEvent> producer = client.newProducer(Schema.AVRO(OrderEvent.class))
.topic("order-events")
.create();
producer.send(new OrderEvent("ORD-001", 99.9, System.currentTimeMillis()));Schema 不兼容时,Broker 会拒绝消息,防止数据格式混乱。
Geo-Replication(跨地域复制)
US-East Cluster ──[异步复制]──► US-West Cluster
──[异步复制]──► EU-Central Cluster
配置:
pulsar-admin namespaces set-replication public/default \
--clusters us-east,us-west,eu-central消息在本地集群写入后,自动异步复制到其他集群,实现多活架构。
Pulsar Functions(轻量级流处理)
java
// 无需 Flink/Spark,直接在 Pulsar 内做简单流处理
public class WordCountFunction implements Function<String, Void> {
@Override
public Void process(String input, Context context) {
Arrays.stream(input.split("\\s+"))
.forEach(word -> context.incrCounter(word, 1));
return null;
}
}bash
# 部署 Function
pulsar-admin functions create \
--jar word-count.jar \
--classname WordCountFunction \
--inputs persistent://public/default/sentences \
--name word-count最佳实践
Topic 分区策略
bash
# 创建分区 Topic(提高并行度)
pulsar-admin topics create-partitioned-topic \
persistent://public/default/order-events \
--partitions 16
# 查看分区信息
pulsar-admin topics partitioned-stats \
persistent://public/default/order-events消息保留策略
bash
# 设置保留策略(已确认消息的保留)
pulsar-admin namespaces set-retention public/default \
--size 5G \ # 最多保留 5GB
--time 3d # 最多保留 3 天
# 设置 TTL(未确认消息的超时)
pulsar-admin namespaces set-message-ttl public/default --messageTTL 86400背压处理
java
// 生产者限速,防止 Broker 过载
Producer<byte[]> producer = client.newProducer()
.topic(topic)
.maxPendingMessages(1000) // 最大待发送消息数
.blockIfQueueFull(true) // 队列满时阻塞(而非抛异常)
.create();故障处理案例
案例一:Bookie 节点磁盘故障
现象:某 Bookie 节点磁盘损坏,该节点上的 Ledger 副本丢失。
Pulsar 自动处理:
- BookKeeper 检测到副本数不足(低于 Write Quorum)
- 自动从其他 Bookie 复制数据,恢复副本数
- 整个过程对 Producer/Consumer 透明
手动处理:
bash
# 查看 Bookie 状态
bookkeeper shell listbookies -rw
# 触发 Ledger 修复
bookkeeper shell bookiesanity案例二:Broker 负载不均
现象:部分 Broker 负载过高,部分空闲。
解决:Pulsar 的 Bundle 机制自动负载均衡:
bash
# 查看 Bundle 分布
pulsar-admin namespaces bundles public/default
# 手动触发 Bundle 卸载(迁移到其他 Broker)
pulsar-admin namespaces unload public/default案例三:消费者积压
现象:某 Subscription 积压大量未消费消息。
bash
# 查看积压情况
pulsar-admin topics stats persistent://public/default/my-topic
# 跳过积压消息(谨慎操作)
pulsar-admin topics skip \
--subscription my-sub \
--count 100000 \
persistent://public/default/my-topic
# 重置消费位点到指定时间
pulsar-admin topics reset-cursor \
--subscription my-sub \
--time 2h \
persistent://public/default/my-topic与 Kafka 对比
| 维度 | Kafka | Pulsar |
|---|---|---|
| 架构 | 存算耦合 | 存算分离 |
| 扩展性 | Partition Rebalance 慢 | Broker 无状态,秒级扩展 |
| 多租户 | 需要外部方案 | 原生支持 |
| 订阅模式 | Consumer Group | 4种订阅模式 |
| 跨地域复制 | MirrorMaker(复杂) | 原生 Geo-Replication |
| 消息保留 | 基于时间/大小 | 基于确认状态 + 时间/大小 |
| 生态 | 极其成熟 | 快速发展 |
| 适用场景 | 日志流、大数据 | 云原生、多租户、IoT |