Skip to content

Apache Pulsar — 云原生消息流

架构设计

Pulsar 最大的创新是存算分离架构,将消息路由(Broker)与消息存储(BookKeeper)彻底解耦。

Producer ──► Pulsar Broker(无状态)──► Apache BookKeeper(有状态存储)
                    ↑                         ↑
              ZooKeeper(元数据)         Bookie 节点集群
              
Consumer ──► Pulsar Broker ──► 从 BookKeeper 读取

核心组件

组件职责
Broker无状态消息路由,负责 Producer/Consumer 连接管理
BookKeeper分布式日志存储,Pulsar 的持久化层
BookieBookKeeper 的存储节点
ZooKeeper存储集群元数据、Broker 注册信息
Pulsar Proxy可选,统一入口代理

存算分离的优势

传统 Kafka 架构:
  Broker = 计算 + 存储(本地磁盘)
  扩容时:必须同时扩计算和存储
  数据迁移:Partition Rebalance 耗时长

Pulsar 架构:
  Broker(无状态)← 独立水平扩展,秒级
  BookKeeper(存储)← 独立扩展,无需数据迁移
  Broker 宕机:立即切换到其他 Broker,无数据丢失

多租户与命名空间

Pulsar 原生支持多租户,是其区别于 Kafka 的重要特性:

Pulsar 命名层级:
  Tenant(租户)
    └── Namespace(命名空间)
          └── Topic(主题)

示例:
  persistent://finance-team/payments/order-events
  │             │             │        │
  协议          租户          命名空间   Topic 名
bash
# 创建租户
pulsar-admin tenants create finance-team \
    --admin-roles admin \
    --allowed-clusters us-east,us-west

# 创建命名空间
pulsar-admin namespaces create finance-team/payments \
    --clusters us-east,us-west

# 设置命名空间策略
pulsar-admin namespaces set-retention finance-team/payments \
    --size 10G --time 7d

订阅模式

Pulsar 支持四种订阅模式,灵活覆盖各种消费场景:

Exclusive(独占)

Topic ──► Subscription ──► 单个 Consumer(独占)

只允许一个消费者,适合有序处理。

Shared(共享)

Topic ──► Subscription ──► Consumer A(消息1,3,5)
                       ──► Consumer B(消息2,4,6)

多消费者轮询分发,提高吞吐,但不保证顺序。

Failover(故障转移)

Topic ──► Subscription ──► Consumer A(主消费者)
                       ──► Consumer B(备用,A 故障时接管)

Key_Shared(Key 共享)

Topic ──► Subscription ──► Consumer A(key=user-1 的所有消息)
                       ──► Consumer B(key=user-2 的所有消息)

相同 Key 的消息路由到同一消费者,保证 Key 级别的顺序性。

Java 客户端示例

java
PulsarClient client = PulsarClient.builder()
    .serviceUrl("pulsar://localhost:6650")
    .build();

// 生产者
Producer<String> producer = client.newProducer(Schema.STRING)
    .topic("persistent://public/default/my-topic")
    .compressionType(CompressionType.LZ4)
    .sendTimeout(5, TimeUnit.SECONDS)
    .create();

producer.newMessage()
    .key("user-123")           // 用于 Key_Shared 路由
    .value("Hello Pulsar")
    .property("source", "order-service")
    .send();

// 消费者(Shared 模式)
Consumer<String> consumer = client.newConsumer(Schema.STRING)
    .topic("persistent://public/default/my-topic")
    .subscriptionName("my-subscription")
    .subscriptionType(SubscriptionType.Shared)
    .messageListener((c, msg) -> {
        System.out.println(msg.getValue());
        c.acknowledgeAsync(msg);
    })
    .subscribe();

Schema Registry

Pulsar 内置 Schema Registry,强制消息格式约束:

java
// 定义 Avro Schema
@Data
public class OrderEvent {
    private String orderId;
    private double amount;
    private long timestamp;
}

// 使用 Schema 发送
Producer<OrderEvent> producer = client.newProducer(Schema.AVRO(OrderEvent.class))
    .topic("order-events")
    .create();

producer.send(new OrderEvent("ORD-001", 99.9, System.currentTimeMillis()));

Schema 不兼容时,Broker 会拒绝消息,防止数据格式混乱。

Geo-Replication(跨地域复制)

US-East Cluster ──[异步复制]──► US-West Cluster
                ──[异步复制]──► EU-Central Cluster

配置:
pulsar-admin namespaces set-replication public/default \
    --clusters us-east,us-west,eu-central

消息在本地集群写入后,自动异步复制到其他集群,实现多活架构。

Pulsar Functions(轻量级流处理)

java
// 无需 Flink/Spark,直接在 Pulsar 内做简单流处理
public class WordCountFunction implements Function<String, Void> {
    @Override
    public Void process(String input, Context context) {
        Arrays.stream(input.split("\\s+"))
            .forEach(word -> context.incrCounter(word, 1));
        return null;
    }
}
bash
# 部署 Function
pulsar-admin functions create \
    --jar word-count.jar \
    --classname WordCountFunction \
    --inputs persistent://public/default/sentences \
    --name word-count

最佳实践

Topic 分区策略

bash
# 创建分区 Topic(提高并行度)
pulsar-admin topics create-partitioned-topic \
    persistent://public/default/order-events \
    --partitions 16

# 查看分区信息
pulsar-admin topics partitioned-stats \
    persistent://public/default/order-events

消息保留策略

bash
# 设置保留策略(已确认消息的保留)
pulsar-admin namespaces set-retention public/default \
    --size 5G \    # 最多保留 5GB
    --time 3d      # 最多保留 3 天

# 设置 TTL(未确认消息的超时)
pulsar-admin namespaces set-message-ttl public/default --messageTTL 86400

背压处理

java
// 生产者限速,防止 Broker 过载
Producer<byte[]> producer = client.newProducer()
    .topic(topic)
    .maxPendingMessages(1000)           // 最大待发送消息数
    .blockIfQueueFull(true)             // 队列满时阻塞(而非抛异常)
    .create();

故障处理案例

案例一:Bookie 节点磁盘故障

现象:某 Bookie 节点磁盘损坏,该节点上的 Ledger 副本丢失。

Pulsar 自动处理

  • BookKeeper 检测到副本数不足(低于 Write Quorum)
  • 自动从其他 Bookie 复制数据,恢复副本数
  • 整个过程对 Producer/Consumer 透明

手动处理

bash
# 查看 Bookie 状态
bookkeeper shell listbookies -rw

# 触发 Ledger 修复
bookkeeper shell bookiesanity

案例二:Broker 负载不均

现象:部分 Broker 负载过高,部分空闲。

解决:Pulsar 的 Bundle 机制自动负载均衡:

bash
# 查看 Bundle 分布
pulsar-admin namespaces bundles public/default

# 手动触发 Bundle 卸载(迁移到其他 Broker)
pulsar-admin namespaces unload public/default

案例三:消费者积压

现象:某 Subscription 积压大量未消费消息。

bash
# 查看积压情况
pulsar-admin topics stats persistent://public/default/my-topic

# 跳过积压消息(谨慎操作)
pulsar-admin topics skip \
    --subscription my-sub \
    --count 100000 \
    persistent://public/default/my-topic

# 重置消费位点到指定时间
pulsar-admin topics reset-cursor \
    --subscription my-sub \
    --time 2h \
    persistent://public/default/my-topic

与 Kafka 对比

维度KafkaPulsar
架构存算耦合存算分离
扩展性Partition Rebalance 慢Broker 无状态,秒级扩展
多租户需要外部方案原生支持
订阅模式Consumer Group4种订阅模式
跨地域复制MirrorMaker(复杂)原生 Geo-Replication
消息保留基于时间/大小基于确认状态 + 时间/大小
生态极其成熟快速发展
适用场景日志流、大数据云原生、多租户、IoT

PaaS 中间件生态系统深度学习文档