RocketMQ — 金融级消息队列
架构概览
RocketMQ 由阿里巴巴开源,专为金融级场景设计,支持顺序消息、事务消息、延迟消息。
Producer Group
└──► NameServer(注册中心,无状态)
└──► Broker Master ──► Broker Slave(同步/异步复制)
↑
Topic: order-topic
├── Queue 0
├── Queue 1
├── Queue 2
└── Queue 3
↓
Consumer Group
├── Consumer A(消费 Queue 0,1)
└── Consumer B(消费 Queue 2,3)核心组件
| 组件 | 职责 |
|---|---|
| NameServer | 轻量级注册中心,Broker 注册,Producer/Consumer 路由查询 |
| Broker | 消息存储与转发,分 Master/Slave |
| Producer | 消息生产者,支持同步/异步/单向发送 |
| Consumer | 消息消费者,支持 Push/Pull 模式 |
| Topic | 消息主题,逻辑分类 |
| Queue | Topic 的物理分区,消费并行度单元 |
存储架构
Broker 存储目录:
├── commitlog/ # 所有消息顺序写入(单文件 1GB)
├── consumequeue/ # 消费队列索引(Topic/Queue/offset)
│ └── order-topic/
│ ├── 0/ # Queue 0 的索引文件
│ └── 1/ # Queue 1 的索引文件
└── index/ # 消息索引(按 Key 查询)顺序写盘是 RocketMQ 高性能的关键:所有消息追加写入 CommitLog,避免随机 IO。
消息类型
普通消息
java
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("order-topic", "TagA",
"Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
// 同步发送(等待 Broker 确认)
SendResult result = producer.send(msg);
System.out.println(result.getSendStatus()); // SEND_OK
// 异步发送
producer.send(msg, new SendCallback() {
public void onSuccess(SendResult result) { /* 成功 */ }
public void onException(Throwable e) { /* 失败处理 */ }
});
// 单向发送(不等确认,最高性能,可能丢失)
producer.sendOneway(msg);顺序消息
RocketMQ 通过将同一业务 Key 的消息路由到同一 Queue 实现顺序性。
java
// 生产者:同一订单的消息发到同一 Queue
producer.send(msg, (mqs, msg, arg) -> {
String orderId = (String) arg;
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}, orderId);
// 消费者:使用 MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
msgs.forEach(msg -> processInOrder(msg));
return ConsumeOrderlyStatus.SUCCESS;
});⚠️ 顺序消息在 Broker 宕机时会暂停消费(等待恢复),不会乱序消费。
延迟消息
RocketMQ 支持 18 个固定延迟级别(开源版):
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hjava
Message msg = new Message("delay-topic", body);
msg.setDelayTimeLevel(3); // 延迟 10s(第3级)
producer.send(msg);RocketMQ 5.x 支持任意时间延迟:
java
// 5.x API
msg.setDeliverTimeMs(System.currentTimeMillis() + 60_000); // 60s 后投递事务消息
解决分布式事务中"本地事务与消息发送"的原子性问题:
Phase 1: 发送半消息(Half Message)
Producer ──► Broker(消息对消费者不可见)
Phase 2: 执行本地事务
Producer 执行本地 DB 操作
Phase 3: 提交或回滚
成功 → Commit(消息变为可见)
失败 → Rollback(消息删除)
Phase 4: 事务回查(Broker 主动询问)
超时未提交 → Broker 回查 Producerjava
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(new TransactionListener() {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder((OrderDTO) arg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 事务回查(Broker 超时后调用)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getUserProperty("orderId");
return orderService.exists(orderId)
? LocalTransactionState.COMMIT_MESSAGE
: LocalTransactionState.ROLLBACK_MESSAGE;
}
});
// 发送事务消息
producer.sendMessageInTransaction(msg, orderDTO);消费模式
集群消费(默认)
同一 Consumer Group 内,每条消息只被一个消费者消费,Queue 均匀分配。
java
consumer.setMessageModel(MessageModel.CLUSTERING); // 默认广播消费
同一 Consumer Group 内,每个消费者都消费所有消息。
java
consumer.setMessageModel(MessageModel.BROADCASTING);消费进度管理
集群消费:消费进度存储在 Broker(RemoteBrokerOffsetStore)
广播消费:消费进度存储在本地(LocalFileOffsetStore)高可用架构
主从同步模式
Master Broker ──[同步复制]──► Slave Broker
↑ ↑
写入消息 备份消息
同步双写(SYNC_MASTER):
- 消息写入 Master 和 Slave 后才返回 ACK
- 数据安全性高,性能略低
异步复制(ASYNC_MASTER):
- 消息写入 Master 后立即返回 ACK
- 性能高,Master 宕机可能丢失少量消息Dledger 模式(RocketMQ 4.5+)
基于 Raft 协议的自动主从切换:
Dledger Group(3节点):
Node1(Leader)──► Node2(Follower)
──► Node3(Follower)
Leader 宕机 → 自动选举新 Leader最佳实践
消息 Key 设置
java
// 设置业务 Key,便于消息追踪和去重
msg.setKeys("ORDER_" + orderId);
// 通过 Key 查询消息(运维排查)
QueryResult result = admin.queryMessage("order-topic", "ORDER_12345", 10, 0, System.currentTimeMillis());消费失败重试
RocketMQ 默认重试 16 次,重试间隔递增:
1s → 5s → 10s → 30s → 1m → 2m → 3m → ... → 2h超过重试次数进入死信队列(%DLQ%consumer-group):
java
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
process(msg);
} catch (Exception e) {
if (msg.getReconsumeTimes() >= 3) {
// 超过3次,记录告警,手动处理
alertService.send("消息处理失败", msg);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 不再重试
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 触发重试
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});故障处理案例
案例一:消息积压
现象:Consumer 处理速度跟不上生产速度,队列消息数持续增长。
应急处理:
bash
# 1. 临时扩容消费者(增加实例数)
# 2. 如果消费者逻辑有 Bug,先暂停消费,修复后重启
# 3. 极端情况:跳过积压消息(业务允许时)
mqadmin resetOffsetByTime -n localhost:9876 -g consumer-group -t topic -s now根本解决:
- 优化消费者处理逻辑(异步化、批量处理)
- 增加 Topic 的 Queue 数量(提高并行度)
- 消费者批量消费:
consumer.setConsumeMessageBatchMaxSize(32)
案例二:Broker 磁盘满
现象:Broker 磁盘使用率 100%,消息无法写入。
排查:
bash
# 查看 CommitLog 占用
du -sh /data/rocketmq/store/commitlog/
# 查看消息保留配置
# broker.conf
fileReservedTime=48 # 消息保留 48 小时
diskMaxUsedSpaceRatio=75 # 磁盘使用率超 75% 开始删除解决:
- 临时:手动删除过期 CommitLog 文件
- 根本:调整
fileReservedTime,增加磁盘容量,或将 CommitLog 迁移到大容量磁盘
案例三:NameServer 全部宕机
现象:NameServer 全部不可用,但 Producer/Consumer 已缓存路由信息,短期内仍可正常工作。
影响:
- 新启动的 Producer/Consumer 无法获取路由,无法工作
- 已运行的实例依赖本地缓存,可继续工作约 30 分钟
处理:立即重启 NameServer,Broker 会自动重新注册。
监控指标
| 指标 | 说明 | 告警阈值 |
|---|---|---|
消息堆积量 | Consumer Lag | > 10万 |
TPS | 每秒消息数 | 接近设计上限 |
消费失败率 | RECONSUME_LATER 比例 | > 1% |
Broker 磁盘使用率 | CommitLog 磁盘占用 | > 70% |
主从同步延迟 | Slave 落后 Master 的字节数 | > 100MB |