Skip to content

RocketMQ — 金融级消息队列

架构概览

RocketMQ 由阿里巴巴开源,专为金融级场景设计,支持顺序消息、事务消息、延迟消息。

Producer Group
  └──► NameServer(注册中心,无状态)
  └──► Broker Master ──► Broker Slave(同步/异步复制)

         Topic: order-topic
           ├── Queue 0
           ├── Queue 1
           ├── Queue 2
           └── Queue 3

         Consumer Group
           ├── Consumer A(消费 Queue 0,1)
           └── Consumer B(消费 Queue 2,3)

核心组件

组件职责
NameServer轻量级注册中心,Broker 注册,Producer/Consumer 路由查询
Broker消息存储与转发,分 Master/Slave
Producer消息生产者,支持同步/异步/单向发送
Consumer消息消费者,支持 Push/Pull 模式
Topic消息主题,逻辑分类
QueueTopic 的物理分区,消费并行度单元

存储架构

Broker 存储目录:
  ├── commitlog/          # 所有消息顺序写入(单文件 1GB)
  ├── consumequeue/       # 消费队列索引(Topic/Queue/offset)
  │     └── order-topic/
  │           ├── 0/      # Queue 0 的索引文件
  │           └── 1/      # Queue 1 的索引文件
  └── index/              # 消息索引(按 Key 查询)

顺序写盘是 RocketMQ 高性能的关键:所有消息追加写入 CommitLog,避免随机 IO。

消息类型

普通消息

java
DefaultMQProducer producer = new DefaultMQProducer("producer-group");
producer.setNamesrvAddr("localhost:9876");
producer.start();

Message msg = new Message("order-topic", "TagA", 
    "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));

// 同步发送(等待 Broker 确认)
SendResult result = producer.send(msg);
System.out.println(result.getSendStatus());  // SEND_OK

// 异步发送
producer.send(msg, new SendCallback() {
    public void onSuccess(SendResult result) { /* 成功 */ }
    public void onException(Throwable e) { /* 失败处理 */ }
});

// 单向发送(不等确认,最高性能,可能丢失)
producer.sendOneway(msg);

顺序消息

RocketMQ 通过将同一业务 Key 的消息路由到同一 Queue 实现顺序性。

java
// 生产者:同一订单的消息发到同一 Queue
producer.send(msg, (mqs, msg, arg) -> {
    String orderId = (String) arg;
    int index = Math.abs(orderId.hashCode()) % mqs.size();
    return mqs.get(index);
}, orderId);

// 消费者:使用 MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
    msgs.forEach(msg -> processInOrder(msg));
    return ConsumeOrderlyStatus.SUCCESS;
});

⚠️ 顺序消息在 Broker 宕机时会暂停消费(等待恢复),不会乱序消费。

延迟消息

RocketMQ 支持 18 个固定延迟级别(开源版):

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
java
Message msg = new Message("delay-topic", body);
msg.setDelayTimeLevel(3);  // 延迟 10s(第3级)
producer.send(msg);

RocketMQ 5.x 支持任意时间延迟:

java
// 5.x API
msg.setDeliverTimeMs(System.currentTimeMillis() + 60_000);  // 60s 后投递

事务消息

解决分布式事务中"本地事务与消息发送"的原子性问题:

Phase 1: 发送半消息(Half Message)
  Producer ──► Broker(消息对消费者不可见)
  
Phase 2: 执行本地事务
  Producer 执行本地 DB 操作
  
Phase 3: 提交或回滚
  成功 → Commit(消息变为可见)
  失败 → Rollback(消息删除)
  
Phase 4: 事务回查(Broker 主动询问)
  超时未提交 → Broker 回查 Producer
java
TransactionMQProducer producer = new TransactionMQProducer("tx-group");
producer.setTransactionListener(new TransactionListener() {
    
    // 执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        try {
            orderService.createOrder((OrderDTO) arg);
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
    
    // 事务回查(Broker 超时后调用)
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getUserProperty("orderId");
        return orderService.exists(orderId) 
            ? LocalTransactionState.COMMIT_MESSAGE 
            : LocalTransactionState.ROLLBACK_MESSAGE;
    }
});

// 发送事务消息
producer.sendMessageInTransaction(msg, orderDTO);

消费模式

集群消费(默认)

同一 Consumer Group 内,每条消息只被一个消费者消费,Queue 均匀分配。

java
consumer.setMessageModel(MessageModel.CLUSTERING);  // 默认

广播消费

同一 Consumer Group 内,每个消费者都消费所有消息。

java
consumer.setMessageModel(MessageModel.BROADCASTING);

消费进度管理

集群消费:消费进度存储在 Broker(RemoteBrokerOffsetStore)
广播消费:消费进度存储在本地(LocalFileOffsetStore)

高可用架构

主从同步模式

Master Broker ──[同步复制]──► Slave Broker
     ↑                              ↑
  写入消息                      备份消息
  
同步双写(SYNC_MASTER):
  - 消息写入 Master 和 Slave 后才返回 ACK
  - 数据安全性高,性能略低

异步复制(ASYNC_MASTER):
  - 消息写入 Master 后立即返回 ACK
  - 性能高,Master 宕机可能丢失少量消息

Dledger 模式(RocketMQ 4.5+)

基于 Raft 协议的自动主从切换:

Dledger Group(3节点):
  Node1(Leader)──► Node2(Follower)
                 ──► Node3(Follower)
  
  Leader 宕机 → 自动选举新 Leader

最佳实践

消息 Key 设置

java
// 设置业务 Key,便于消息追踪和去重
msg.setKeys("ORDER_" + orderId);

// 通过 Key 查询消息(运维排查)
QueryResult result = admin.queryMessage("order-topic", "ORDER_12345", 10, 0, System.currentTimeMillis());

消费失败重试

RocketMQ 默认重试 16 次,重试间隔递增:

1s → 5s → 10s → 30s → 1m → 2m → 3m → ... → 2h

超过重试次数进入死信队列(%DLQ%consumer-group):

java
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    for (MessageExt msg : msgs) {
        try {
            process(msg);
        } catch (Exception e) {
            if (msg.getReconsumeTimes() >= 3) {
                // 超过3次,记录告警,手动处理
                alertService.send("消息处理失败", msg);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  // 不再重试
            }
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;  // 触发重试
        }
    }
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

故障处理案例

案例一:消息积压

现象:Consumer 处理速度跟不上生产速度,队列消息数持续增长。

应急处理

bash
# 1. 临时扩容消费者(增加实例数)
# 2. 如果消费者逻辑有 Bug,先暂停消费,修复后重启
# 3. 极端情况:跳过积压消息(业务允许时)
mqadmin resetOffsetByTime -n localhost:9876 -g consumer-group -t topic -s now

根本解决

  • 优化消费者处理逻辑(异步化、批量处理)
  • 增加 Topic 的 Queue 数量(提高并行度)
  • 消费者批量消费:consumer.setConsumeMessageBatchMaxSize(32)

案例二:Broker 磁盘满

现象:Broker 磁盘使用率 100%,消息无法写入。

排查

bash
# 查看 CommitLog 占用
du -sh /data/rocketmq/store/commitlog/

# 查看消息保留配置
# broker.conf
fileReservedTime=48  # 消息保留 48 小时
diskMaxUsedSpaceRatio=75  # 磁盘使用率超 75% 开始删除

解决

  1. 临时:手动删除过期 CommitLog 文件
  2. 根本:调整 fileReservedTime,增加磁盘容量,或将 CommitLog 迁移到大容量磁盘

案例三:NameServer 全部宕机

现象:NameServer 全部不可用,但 Producer/Consumer 已缓存路由信息,短期内仍可正常工作。

影响

  • 新启动的 Producer/Consumer 无法获取路由,无法工作
  • 已运行的实例依赖本地缓存,可继续工作约 30 分钟

处理:立即重启 NameServer,Broker 会自动重新注册。

监控指标

指标说明告警阈值
消息堆积量Consumer Lag> 10万
TPS每秒消息数接近设计上限
消费失败率RECONSUME_LATER 比例> 1%
Broker 磁盘使用率CommitLog 磁盘占用> 70%
主从同步延迟Slave 落后 Master 的字节数> 100MB

PaaS 中间件生态系统深度学习文档