Skip to content

Apache Kafka — 分布式消息流平台

架构概览

Kafka 是 LinkedIn 开源的分布式消息流平台,以高吞吐、持久化、可回放为核心设计目标。

Producer ──► Broker1 (Leader)  ──► Broker2 (Follower)
             Broker3 (Leader)  ──► Broker1 (Follower)
             Broker2 (Leader)  ──► Broker3 (Follower)

             ZooKeeper / KRaft(元数据管理)

Consumer Group A ──► Consumer1(Partition 0,1)
                 ──► Consumer2(Partition 2,3)
Consumer Group B ──► Consumer3(所有 Partition,独立消费)

核心概念

概念说明
Topic消息的逻辑分类,类似数据库的表
PartitionTopic 的物理分区,消费并行度单元,有序
Offset消息在 Partition 内的唯一序号,单调递增
BrokerKafka 服务节点,每个 Partition 有一个 Leader Broker
Producer消息生产者,决定消息写入哪个 Partition
Consumer Group消费者组,组内每个 Partition 只被一个 Consumer 消费
ReplicaPartition 副本,分 Leader 和 Follower
ISRIn-Sync Replicas,与 Leader 保持同步的副本集合

存储机制

Topic: order-events(4个 Partition,3副本)

Broker1:
  order-events-0 (Leader)   → segment文件: 00000000000000000000.log
  order-events-1 (Follower)                00000000000001000000.log
  order-events-2 (Follower)                ...

每个 Segment 文件:
  .log   → 消息数据(顺序追加写)
  .index → 稀疏索引(offset → 文件位置)
  .timeindex → 时间索引(timestamp → offset)

顺序写盘是 Kafka 高吞吐的关键,配合 sendfile 零拷贝,单机可达百万级 msg/s。

生产者

基础配置

java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all");          // 等待所有 ISR 副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);  // 幂等生产者(防重复)

// 性能配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);         // 批量大小 16KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);              // 等待 5ms 凑批
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");   // 压缩
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);   // 缓冲区 32MB

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

发送模式

java
ProducerRecord<String, String> record = new ProducerRecord<>(
    "order-events",   // topic
    "user-123",       // key(决定路由到哪个 Partition)
    orderJson         // value
);

// 异步发送(推荐)
producer.send(record, (metadata, exception) -> {
    if (exception != null) {
        log.error("Send failed", exception);
        // 重试或告警
    } else {
        log.debug("Sent to {}-{} offset={}", 
            metadata.topic(), metadata.partition(), metadata.offset());
    }
});

// 同步发送(低吞吐,用于关键消息)
RecordMetadata metadata = producer.send(record).get(5, TimeUnit.SECONDS);

// 发送后必须 flush,确保缓冲区消息都发出
producer.flush();

acks 参数详解

acks说明可靠性性能
0不等待确认,发完即走最低(可能丢失)最高
1等待 Leader 写入确认中(Leader 宕机可能丢失)
all/-1等待所有 ISR 副本确认最高最低

生产推荐acks=all + min.insync.replicas=2 + enable.idempotence=true

消费者

基础配置

java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

// Offset 提交策略
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);  // 关闭自动提交(推荐)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");  // 无 offset 时从头消费

// 拉取配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);          // 每次最多拉 500 条
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);   // 两次 poll 最大间隔 5min
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000);      // 心跳超时 30s

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("order-events"));

消费循环

java
try {
    while (running) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        
        for (ConsumerRecord<String, String> record : records) {
            try {
                processRecord(record);
            } catch (Exception e) {
                // 处理失败:记录日志,发送到死信 Topic,或重试
                log.error("Failed to process record offset={}", record.offset(), e);
                sendToDeadLetterTopic(record);
            }
        }
        
        // 手动提交 offset(处理完后提交,保证 at-least-once)
        consumer.commitSync();
        
        // 或异步提交(性能更好)
        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
                log.warn("Async commit failed", exception);
            }
        });
    }
} finally {
    consumer.close();
}

精确一次语义(Exactly Once)

java
// 方案:消费 + 处理 + 生产 在同一事务中
producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    producer.beginTransaction();
    try {
        for (ConsumerRecord<String, String> record : records) {
            // 处理并生产到下游 Topic
            ProducerRecord<String, String> output = transform(record);
            producer.send(output);
        }
        
        // 将 offset 提交到事务中(而非 __consumer_offsets)
        Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);
        producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
        
        producer.commitTransaction();
    } catch (Exception e) {
        producer.abortTransaction();
        // 重置 offset,重新消费
        consumer.seekToBeginning(consumer.assignment());
    }
}

Partition 分配策略

java
// 生产者:自定义分区器
public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);
        if (keyBytes == null) {
            // 无 key:轮询(Kafka 2.4+ 默认粘性分区)
            return ThreadLocalRandom.current().nextInt(numPartitions);
        }
        // 有 key:哈希取模(保证同 key 进同 Partition)
        return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
    }
}

props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());
java
// 消费者:手动分配 Partition(不参与 Rebalance)
TopicPartition partition0 = new TopicPartition("order-events", 0);
TopicPartition partition1 = new TopicPartition("order-events", 1);
consumer.assign(List.of(partition0, partition1));

// 手动指定 offset
consumer.seek(partition0, 1000L);  // 从 offset 1000 开始消费

Rebalance 机制

Consumer Group 在以下情况触发 Rebalance:

  • 新 Consumer 加入
  • Consumer 宕机或超时(session.timeout.ms
  • 两次 poll() 间隔超过 max.poll.interval.ms
  • Topic Partition 数变化
Rebalance 协议(Cooperative Sticky,Kafka 2.4+):
  1. Consumer 向 Group Coordinator 发送 JoinGroup
  2. Coordinator 选出 Leader Consumer
  3. Leader 执行分区分配算法
  4. 所有 Consumer 收到新分配,只撤销需要转移的 Partition
  5. 继续消费(减少停顿)

旧协议(Eager):所有 Consumer 先停止消费,再重新分配(Stop-the-World)

减少 Rebalance 的实践

java
// 增大心跳超时,减少误判
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);

// 增大 poll 间隔(处理慢时避免被踢出)
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);

// 减少每次 poll 的消息数(加快处理速度)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);

高可用配置

Broker 关键配置

properties
# server.properties

# 副本同步
default.replication.factor=3
min.insync.replicas=2          # 至少2个 ISR 副本才允许写入

# 日志保留
log.retention.hours=168        # 保留7天
log.retention.bytes=107374182400  # 或按大小(100GB)
log.segment.bytes=1073741824   # 单个 Segment 文件 1GB

# 性能
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 副本拉取
replica.fetch.max.bytes=1048576
replica.lag.time.max.ms=30000  # Follower 落后超过 30s 移出 ISR

KRaft 模式(Kafka 3.3+,无 ZooKeeper)

properties
# 控制器节点配置
process.roles=controller
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093

# Broker 节点配置
process.roles=broker
node.id=4
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093

# 混合模式(小集群)
process.roles=broker,controller

Topic 管理

bash
# 创建 Topic
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create --topic order-events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config min.insync.replicas=2

# 查看 Topic 详情
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --topic order-events

# 扩容 Partition(只能增加,不能减少)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --alter --topic order-events --partitions 24

# 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group order-consumer-group

# 重置 offset(谨慎操作)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group order-consumer-group \
  --topic order-events \
  --reset-offsets --to-earliest --execute

Spring Boot 集成

yaml
# application.yml
spring:
  kafka:
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    producer:
      acks: all
      retries: 3
      batch-size: 16384
      linger-ms: 5
      compression-type: lz4
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      properties:
        enable.idempotence: true
    consumer:
      group-id: order-consumer-group
      auto-offset-reset: earliest
      enable-auto-commit: false
      max-poll-records: 500
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.example.dto"
    listener:
      ack-mode: manual_immediate  # 手动提交
      concurrency: 3              # 3个消费者线程
java
// 生产者
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;

public void publishOrderEvent(OrderEvent event) {
    kafkaTemplate.send("order-events", event.getOrderId(), event)
        .whenComplete((result, ex) -> {
            if (ex != null) {
                log.error("Failed to send order event", ex);
            }
        });
}

// 消费者
@KafkaListener(
    topics = "order-events",
    groupId = "order-consumer-group",
    containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
    ConsumerRecord<String, OrderEvent> record,
    Acknowledgment ack
) {
    try {
        orderService.process(record.value());
        ack.acknowledge();  // 手动提交 offset
    } catch (Exception e) {
        log.error("Failed to process order event offset={}", record.offset(), e);
        // 不 ack,触发重试(或发送到死信 Topic)
        ack.nack(Duration.ofSeconds(5));  // 5s 后重试
    }
}

死信队列(DLT)

java
// Spring Kafka 内置 DLT 支持
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<?, ?> template) {
    // 重试3次后发送到 DLT(order-events.DLT)
    DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
    
    ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
    backOff.setInitialInterval(1000L);
    backOff.setMultiplier(2.0);
    backOff.setMaxInterval(10000L);
    
    return new DefaultErrorHandler(recoverer, backOff);
}

// 消费死信 Topic
@KafkaListener(topics = "order-events.DLT", groupId = "dlt-consumer")
public void handleDeadLetter(ConsumerRecord<String, OrderEvent> record) {
    log.error("Dead letter: topic={} partition={} offset={} key={}",
        record.topic(), record.partition(), record.offset(), record.key());
    // 告警、人工处理
    alertService.sendDeadLetterAlert(record);
}

故障处理案例

案例一:消费者 Lag 持续增长

现象kafka-consumer-groups.sh --describe 显示 LAG 持续增大。

排查

bash
# 查看各 Partition 的 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group order-consumer-group

# 查看消费者线程数
# 查看消费者处理耗时(应用日志)

解决

  1. 增加消费者实例(不超过 Partition 数)
  2. 增大 concurrency(并发消费线程)
  3. 优化消费者处理逻辑(异步化、批量处理)
  4. 临时跳过积压(业务允许时):--reset-offsets --to-latest

案例二:Producer 发送超时

现象:Producer 报 TimeoutException: Expiring X record(s) for topic-partition

原因

  • Broker 负载过高,响应慢
  • 网络问题
  • buffer.memory 满,新消息无法入队

解决

java
// 增大缓冲区和超时
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);  // 64MB
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000);      // 阻塞等待 60s
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);

案例三:Rebalance 频繁导致消费停顿

现象:日志频繁出现 Rebalancing...,消费延迟增大。

排查

bash
# 查看 Consumer Group 状态
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --describe --group order-consumer-group
# 关注 STATE 列:Stable / PreparingRebalance / CompletingRebalance

常见原因

  • 消费者处理太慢,超过 max.poll.interval.ms
  • GC 停顿导致心跳超时
  • 频繁部署(Consumer 实例频繁上下线)

解决

java
// 减少每次 poll 的消息量,加快处理
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
// 增大 poll 间隔
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
// 使用 Cooperative Sticky 分配策略(减少 Rebalance 影响范围)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
    CooperativeStickyAssignor.class.getName());

案例四:磁盘空间不足

bash
# 查看各 Topic 磁盘占用
kafka-log-dirs.sh --bootstrap-server kafka:9092 \
  --topic-list order-events --describe

# 临时缩短保留时间
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics --entity-name order-events \
  --alter --add-config retention.ms=86400000  # 改为1天

# 触发立即删除过期 Segment
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers --entity-name 1 \
  --alter --add-config log.retention.check.interval.ms=1000

监控指标

指标说明告警阈值
kafka_consumer_lag消费者 Lag> 10万
kafka_server_BrokerTopicMetrics_MessagesInPerSec消息写入速率接近上限
kafka_server_ReplicaManager_UnderReplicatedPartitions副本不足的 Partition 数> 0
kafka_controller_KafkaController_ActiveControllerCount活跃 Controller 数≠ 1
kafka_server_BrokerTopicMetrics_BytesInPerSec写入字节速率接近网卡上限
kafka_log_LogFlushRateAndTimeMs日志刷盘延迟P99 > 1s
kafka_network_RequestMetrics_RequestQueueTimeMs请求队列等待时间P99 > 100ms

Kafka vs 其他消息队列

维度KafkaRocketMQRabbitMQPulsar
吞吐量极高(百万/s)高(十万/s)中(万/s)极高
延迟中(5~15ms)低(<5ms)极低(<1ms)低(<5ms)
消息回放✅ 按 offset✅ 按时间
顺序消息Partition 内有序全局/分区有序Queue 内有序Key 有序
事务消息✅(0.11+)
延迟消息❌(需插件)✅(18级/任意)✅(插件)
多租户基础VHost✅ 原生
存算分离
适用场景日志流、大数据、事件溯源金融交易、顺序消息业务解耦、任务队列云原生、多租户

PaaS 中间件生态系统深度学习文档