Apache Kafka — 分布式消息流平台
架构概览
Kafka 是 LinkedIn 开源的分布式消息流平台,以高吞吐、持久化、可回放为核心设计目标。
Producer ──► Broker1 (Leader) ──► Broker2 (Follower)
Broker3 (Leader) ──► Broker1 (Follower)
Broker2 (Leader) ──► Broker3 (Follower)
↑
ZooKeeper / KRaft(元数据管理)
↓
Consumer Group A ──► Consumer1(Partition 0,1)
──► Consumer2(Partition 2,3)
Consumer Group B ──► Consumer3(所有 Partition,独立消费)核心概念
| 概念 | 说明 |
|---|---|
| Topic | 消息的逻辑分类,类似数据库的表 |
| Partition | Topic 的物理分区,消费并行度单元,有序 |
| Offset | 消息在 Partition 内的唯一序号,单调递增 |
| Broker | Kafka 服务节点,每个 Partition 有一个 Leader Broker |
| Producer | 消息生产者,决定消息写入哪个 Partition |
| Consumer Group | 消费者组,组内每个 Partition 只被一个 Consumer 消费 |
| Replica | Partition 副本,分 Leader 和 Follower |
| ISR | In-Sync Replicas,与 Leader 保持同步的副本集合 |
存储机制
Topic: order-events(4个 Partition,3副本)
Broker1:
order-events-0 (Leader) → segment文件: 00000000000000000000.log
order-events-1 (Follower) 00000000000001000000.log
order-events-2 (Follower) ...
每个 Segment 文件:
.log → 消息数据(顺序追加写)
.index → 稀疏索引(offset → 文件位置)
.timeindex → 时间索引(timestamp → offset)顺序写盘是 Kafka 高吞吐的关键,配合 sendfile 零拷贝,单机可达百万级 msg/s。
生产者
基础配置
java
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 可靠性配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等待所有 ISR 副本确认
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); // 幂等生产者(防重复)
// 性能配置
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批量大小 16KB
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 等待 5ms 凑批
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // 压缩
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 缓冲区 32MB
KafkaProducer<String, String> producer = new KafkaProducer<>(props);发送模式
java
ProducerRecord<String, String> record = new ProducerRecord<>(
"order-events", // topic
"user-123", // key(决定路由到哪个 Partition)
orderJson // value
);
// 异步发送(推荐)
producer.send(record, (metadata, exception) -> {
if (exception != null) {
log.error("Send failed", exception);
// 重试或告警
} else {
log.debug("Sent to {}-{} offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
}
});
// 同步发送(低吞吐,用于关键消息)
RecordMetadata metadata = producer.send(record).get(5, TimeUnit.SECONDS);
// 发送后必须 flush,确保缓冲区消息都发出
producer.flush();acks 参数详解
| acks | 说明 | 可靠性 | 性能 |
|---|---|---|---|
0 | 不等待确认,发完即走 | 最低(可能丢失) | 最高 |
1 | 等待 Leader 写入确认 | 中(Leader 宕机可能丢失) | 中 |
all/-1 | 等待所有 ISR 副本确认 | 最高 | 最低 |
生产推荐:acks=all + min.insync.replicas=2 + enable.idempotence=true
消费者
基础配置
java
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-consumer-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Offset 提交策略
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 关闭自动提交(推荐)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 无 offset 时从头消费
// 拉取配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次最多拉 500 条
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 两次 poll 最大间隔 5min
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); // 心跳超时 30s
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("order-events"));消费循环
java
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (Exception e) {
// 处理失败:记录日志,发送到死信 Topic,或重试
log.error("Failed to process record offset={}", record.offset(), e);
sendToDeadLetterTopic(record);
}
}
// 手动提交 offset(处理完后提交,保证 at-least-once)
consumer.commitSync();
// 或异步提交(性能更好)
consumer.commitAsync((offsets, exception) -> {
if (exception != null) {
log.warn("Async commit failed", exception);
}
});
}
} finally {
consumer.close();
}精确一次语义(Exactly Once)
java
// 方案:消费 + 处理 + 生产 在同一事务中
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
// 处理并生产到下游 Topic
ProducerRecord<String, String> output = transform(record);
producer.send(output);
}
// 将 offset 提交到事务中(而非 __consumer_offsets)
Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
// 重置 offset,重新消费
consumer.seekToBeginning(consumer.assignment());
}
}Partition 分配策略
java
// 生产者:自定义分区器
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
int numPartitions = cluster.partitionCountForTopic(topic);
if (keyBytes == null) {
// 无 key:轮询(Kafka 2.4+ 默认粘性分区)
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// 有 key:哈希取模(保证同 key 进同 Partition)
return Math.abs(Utils.murmur2(keyBytes)) % numPartitions;
}
}
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());java
// 消费者:手动分配 Partition(不参与 Rebalance)
TopicPartition partition0 = new TopicPartition("order-events", 0);
TopicPartition partition1 = new TopicPartition("order-events", 1);
consumer.assign(List.of(partition0, partition1));
// 手动指定 offset
consumer.seek(partition0, 1000L); // 从 offset 1000 开始消费Rebalance 机制
Consumer Group 在以下情况触发 Rebalance:
- 新 Consumer 加入
- Consumer 宕机或超时(
session.timeout.ms) - 两次
poll()间隔超过max.poll.interval.ms - Topic Partition 数变化
Rebalance 协议(Cooperative Sticky,Kafka 2.4+):
1. Consumer 向 Group Coordinator 发送 JoinGroup
2. Coordinator 选出 Leader Consumer
3. Leader 执行分区分配算法
4. 所有 Consumer 收到新分配,只撤销需要转移的 Partition
5. 继续消费(减少停顿)
旧协议(Eager):所有 Consumer 先停止消费,再重新分配(Stop-the-World)减少 Rebalance 的实践:
java
// 增大心跳超时,减少误判
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
// 增大 poll 间隔(处理慢时避免被踢出)
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
// 减少每次 poll 的消息数(加快处理速度)
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);高可用配置
Broker 关键配置
properties
# server.properties
# 副本同步
default.replication.factor=3
min.insync.replicas=2 # 至少2个 ISR 副本才允许写入
# 日志保留
log.retention.hours=168 # 保留7天
log.retention.bytes=107374182400 # 或按大小(100GB)
log.segment.bytes=1073741824 # 单个 Segment 文件 1GB
# 性能
num.io.threads=8
num.network.threads=3
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 副本拉取
replica.fetch.max.bytes=1048576
replica.lag.time.max.ms=30000 # Follower 落后超过 30s 移出 ISRKRaft 模式(Kafka 3.3+,无 ZooKeeper)
properties
# 控制器节点配置
process.roles=controller
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# Broker 节点配置
process.roles=broker
node.id=4
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
# 混合模式(小集群)
process.roles=broker,controllerTopic 管理
bash
# 创建 Topic
kafka-topics.sh --bootstrap-server kafka:9092 \
--create --topic order-events \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config min.insync.replicas=2
# 查看 Topic 详情
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --topic order-events
# 扩容 Partition(只能增加,不能减少)
kafka-topics.sh --bootstrap-server kafka:9092 \
--alter --topic order-events --partitions 24
# 查看消费者 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group order-consumer-group
# 重置 offset(谨慎操作)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group order-consumer-group \
--topic order-events \
--reset-offsets --to-earliest --executeSpring Boot 集成
yaml
# application.yml
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
producer:
acks: all
retries: 3
batch-size: 16384
linger-ms: 5
compression-type: lz4
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
enable.idempotence: true
consumer:
group-id: order-consumer-group
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 500
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.dto"
listener:
ack-mode: manual_immediate # 手动提交
concurrency: 3 # 3个消费者线程java
// 生产者
@Autowired
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
public void publishOrderEvent(OrderEvent event) {
kafkaTemplate.send("order-events", event.getOrderId(), event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("Failed to send order event", ex);
}
});
}
// 消费者
@KafkaListener(
topics = "order-events",
groupId = "order-consumer-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void handleOrderEvent(
ConsumerRecord<String, OrderEvent> record,
Acknowledgment ack
) {
try {
orderService.process(record.value());
ack.acknowledge(); // 手动提交 offset
} catch (Exception e) {
log.error("Failed to process order event offset={}", record.offset(), e);
// 不 ack,触发重试(或发送到死信 Topic)
ack.nack(Duration.ofSeconds(5)); // 5s 后重试
}
}死信队列(DLT)
java
// Spring Kafka 内置 DLT 支持
@Bean
public DefaultErrorHandler errorHandler(KafkaTemplate<?, ?> template) {
// 重试3次后发送到 DLT(order-events.DLT)
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
ExponentialBackOffWithMaxRetries backOff = new ExponentialBackOffWithMaxRetries(3);
backOff.setInitialInterval(1000L);
backOff.setMultiplier(2.0);
backOff.setMaxInterval(10000L);
return new DefaultErrorHandler(recoverer, backOff);
}
// 消费死信 Topic
@KafkaListener(topics = "order-events.DLT", groupId = "dlt-consumer")
public void handleDeadLetter(ConsumerRecord<String, OrderEvent> record) {
log.error("Dead letter: topic={} partition={} offset={} key={}",
record.topic(), record.partition(), record.offset(), record.key());
// 告警、人工处理
alertService.sendDeadLetterAlert(record);
}故障处理案例
案例一:消费者 Lag 持续增长
现象:kafka-consumer-groups.sh --describe 显示 LAG 持续增大。
排查:
bash
# 查看各 Partition 的 Lag
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group order-consumer-group
# 查看消费者线程数
# 查看消费者处理耗时(应用日志)解决:
- 增加消费者实例(不超过 Partition 数)
- 增大
concurrency(并发消费线程) - 优化消费者处理逻辑(异步化、批量处理)
- 临时跳过积压(业务允许时):
--reset-offsets --to-latest
案例二:Producer 发送超时
现象:Producer 报 TimeoutException: Expiring X record(s) for topic-partition。
原因:
- Broker 负载过高,响应慢
- 网络问题
buffer.memory满,新消息无法入队
解决:
java
// 增大缓冲区和超时
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864); // 64MB
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60000); // 阻塞等待 60s
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);案例三:Rebalance 频繁导致消费停顿
现象:日志频繁出现 Rebalancing...,消费延迟增大。
排查:
bash
# 查看 Consumer Group 状态
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group order-consumer-group
# 关注 STATE 列:Stable / PreparingRebalance / CompletingRebalance常见原因:
- 消费者处理太慢,超过
max.poll.interval.ms - GC 停顿导致心跳超时
- 频繁部署(Consumer 实例频繁上下线)
解决:
java
// 减少每次 poll 的消息量,加快处理
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
// 增大 poll 间隔
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
// 使用 Cooperative Sticky 分配策略(减少 Rebalance 影响范围)
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());案例四:磁盘空间不足
bash
# 查看各 Topic 磁盘占用
kafka-log-dirs.sh --bootstrap-server kafka:9092 \
--topic-list order-events --describe
# 临时缩短保留时间
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name order-events \
--alter --add-config retention.ms=86400000 # 改为1天
# 触发立即删除过期 Segment
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 \
--alter --add-config log.retention.check.interval.ms=1000监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
kafka_consumer_lag | 消费者 Lag | > 10万 |
kafka_server_BrokerTopicMetrics_MessagesInPerSec | 消息写入速率 | 接近上限 |
kafka_server_ReplicaManager_UnderReplicatedPartitions | 副本不足的 Partition 数 | > 0 |
kafka_controller_KafkaController_ActiveControllerCount | 活跃 Controller 数 | ≠ 1 |
kafka_server_BrokerTopicMetrics_BytesInPerSec | 写入字节速率 | 接近网卡上限 |
kafka_log_LogFlushRateAndTimeMs | 日志刷盘延迟 | P99 > 1s |
kafka_network_RequestMetrics_RequestQueueTimeMs | 请求队列等待时间 | P99 > 100ms |
Kafka vs 其他消息队列
| 维度 | Kafka | RocketMQ | RabbitMQ | Pulsar |
|---|---|---|---|---|
| 吞吐量 | 极高(百万/s) | 高(十万/s) | 中(万/s) | 极高 |
| 延迟 | 中(5~15ms) | 低(<5ms) | 极低(<1ms) | 低(<5ms) |
| 消息回放 | ✅ 按 offset | ✅ 按时间 | ❌ | ✅ |
| 顺序消息 | Partition 内有序 | 全局/分区有序 | Queue 内有序 | Key 有序 |
| 事务消息 | ✅(0.11+) | ✅ | ❌ | ✅ |
| 延迟消息 | ❌(需插件) | ✅(18级/任意) | ✅(插件) | ✅ |
| 多租户 | ❌ | 基础 | VHost | ✅ 原生 |
| 存算分离 | ❌ | ❌ | ❌ | ✅ |
| 适用场景 | 日志流、大数据、事件溯源 | 金融交易、顺序消息 | 业务解耦、任务队列 | 云原生、多租户 |