Skip to content

RabbitMQ — AMQP 消息代理

核心架构

RabbitMQ 基于 AMQP 0-9-1 协议,采用 Broker 中心化架构。消息不直接发送到队列,而是先到 Exchange,由 Exchange 根据路由规则分发到绑定的队列。

Producer
  └──► Exchange ──(Binding + Routing Key)──► Queue ──► Consumer

         Exchange Types:
           direct / fanout / topic / headers

核心组件

组件说明
Producer消息生产者,发布消息到 Exchange
Exchange消息路由器,不存储消息
BindingExchange 与 Queue 的绑定关系
Queue消息存储队列,消费者从此消费
Consumer消息消费者
Virtual Host逻辑隔离单元,类似命名空间
ChannelTCP 连接内的虚拟连接,复用连接

Exchange 类型详解

Direct Exchange:精确匹配 Routing Key

Exchange(direct) ──[routing_key=order.created]──► Queue: order-queue
                 ──[routing_key=user.created]───► Queue: user-queue

Fanout Exchange:广播,忽略 Routing Key

Exchange(fanout) ──► Queue: service-A
                 ──► Queue: service-B
                 ──► Queue: service-C

Topic Exchange:通配符匹配(* 匹配一个词,# 匹配零或多个词)

Exchange(topic)
  ──[order.*]──────► Queue: order-all
  ──[order.created]► Queue: order-new
  ──[#.error]──────► Queue: all-errors

Headers Exchange:基于消息头属性匹配,性能较差,少用。

消息可靠性保障

生产者确认(Publisher Confirms)

java
// 开启 Confirm 模式
channel.confirmSelect();

// 发送消息
channel.basicPublish(exchange, routingKey, props, body);

// 同步等待确认(简单但阻塞)
channel.waitForConfirmsOrDie(5000);

// 异步确认(推荐)
channel.addConfirmListener(
    (deliveryTag, multiple) -> log.info("ACK: {}", deliveryTag),
    (deliveryTag, multiple) -> log.warn("NACK: {}", deliveryTag)
);

消费者确认(Consumer ACK)

java
// 手动 ACK 模式(autoAck=false)
channel.basicConsume(queueName, false, (tag, delivery) -> {
    try {
        process(delivery.getBody());
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    } catch (Exception e) {
        // requeue=true 重新入队,false 丢弃或进死信队列
        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
    }
}, tag -> {});

消息持久化

java
// 队列持久化
channel.queueDeclare("my-queue", 
    true,   // durable=true
    false,  // exclusive
    false,  // autoDelete
    null);

// 消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .deliveryMode(2)  // 2=持久化
    .build();
channel.basicPublish(exchange, routingKey, props, body);

⚠️ 持久化不能 100% 保证不丢失,消息写入磁盘前 Broker 崩溃仍会丢失。需配合 Publisher Confirms 使用。

死信队列(DLX)

消息成为死信的三种情况:

  1. 消息被 NACK 且 requeue=false
  2. 消息 TTL 过期
  3. 队列达到最大长度
java
// 声明死信队列
channel.queueDeclare("dlx-queue", true, false, false, null);
channel.exchangeDeclare("dlx-exchange", "direct");
channel.queueBind("dlx-queue", "dlx-exchange", "dlx-key");

// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-key");
args.put("x-message-ttl", 30000);  // 消息 TTL 30s
args.put("x-max-length", 10000);   // 最大队列长度

channel.queueDeclare("business-queue", true, false, false, args);

延迟消息

RabbitMQ 原生不支持延迟消息,有两种实现方式:

方式一:TTL + 死信队列

Producer ──► delay-queue(TTL=10s, DLX=real-exchange) ──[过期]──► real-queue ──► Consumer

缺点:队列头部消息未过期会阻塞后续消息(即使后续消息已过期)。

方式二:rabbitmq-delayed-message-exchange 插件

bash
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
java
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);

// 发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .headers(Map.of("x-delay", 10000))  // 延迟 10s
    .build();
channel.basicPublish("delayed-exchange", routingKey, props, body);

集群架构

普通集群(Classic Cluster)

Node1 (Queue Master) ←──── Node2 (Queue Mirror)
      ↑                           ↑
   Consumer                   Consumer
   (直接消费)              (转发到 Master)
  • 队列数据只存在于 Master 节点
  • 其他节点只存储元数据,消费时转发请求
  • Master 宕机,队列不可用(除非开启镜像)

镜像队列(Mirrored Queue)

java
// 通过 Policy 设置镜像
rabbitmqctl set_policy ha-all "^ha\." \
    '{"ha-mode":"all","ha-sync-mode":"automatic"}'
  • 所有节点都有完整队列副本
  • 性能开销大,写入需同步到所有镜像
  • RabbitMQ 3.8+ 推荐使用 Quorum Queue 替代

Quorum Queue(仲裁队列)

基于 Raft 协议,RabbitMQ 3.8+ 推荐的高可用方案:

java
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("my-quorum-queue", true, false, false, args);

优势:

  • Raft 协议保证强一致性
  • 自动 Leader 选举
  • 更好的数据安全性

最佳实践

连接与 Channel 管理

java
// ✅ 正确:连接复用,每个线程独立 Channel
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();  // 应用级别单例

// 每个线程/请求创建独立 Channel
Channel channel = connection.createChannel();
// ... 使用 channel
channel.close();  // 用完关闭
java
// ❌ 错误:每次操作创建新连接(TCP 开销巨大)
Connection conn = factory.newConnection();
Channel ch = conn.createChannel();
ch.basicPublish(...);
conn.close();

消费者预取(QoS)

java
// 每次最多推送 10 条未确认消息,防止消费者被压垮
channel.basicQos(10);
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);

消息幂等性

消费者必须实现幂等处理,因为网络异常可能导致消息重复投递:

java
void processMessage(String messageId, byte[] body) {
    // 检查是否已处理(Redis / DB 去重)
    if (idempotencyStore.exists(messageId)) {
        return;
    }
    // 业务处理
    doProcess(body);
    // 记录已处理
    idempotencyStore.set(messageId, "1", Duration.ofDays(7));
}

故障处理案例

案例一:消息堆积导致内存告警

现象:RabbitMQ 内存使用率超过 vm_memory_high_watermark(默认 40%),触发 Flow Control,生产者被阻塞。

排查步骤

bash
# 查看队列状态
rabbitmqctl list_queues name messages consumers memory

# 查看连接状态
rabbitmqctl list_connections name state recv_oct send_oct

# 查看内存使用
rabbitmqctl status | grep memory

解决方案

  1. 临时:增加消费者实例,加速消费
  2. 临时:rabbitmqctl purge_queue <queue> 清空非重要队列
  3. 根本:优化消费者处理速度,设置合理的 x-max-lengthx-overflow: reject-publish

案例二:消息重复消费

现象:消费者处理完消息后,ACK 前网络断开,消息重新入队被再次消费。

解决方案

java
// 方案:数据库唯一约束 + 幂等处理
@Transactional
public void handleOrder(OrderMessage msg) {
    // 利用数据库唯一索引防重
    try {
        orderService.createOrder(msg.getOrderId(), msg.getData());
    } catch (DuplicateKeyException e) {
        // 已处理,直接 ACK
        log.warn("Duplicate message: {}", msg.getOrderId());
    }
}

案例三:脑裂(Network Partition)

现象:集群节点间网络分区,各自认为对方宕机,出现双 Master。

配置建议

ini
# rabbitmq.conf
# 分区处理策略:pause-minority(少数派暂停服务)
cluster_partition_handling = pause-minority
策略说明适用场景
ignore忽略分区(默认)不推荐
pause-minority少数派节点暂停奇数节点集群
pause-if-all-down所有节点都断才暂停特定场景
autoheal自动选择胜者可接受数据丢失

监控指标

指标告警阈值说明
rabbitmq_queue_messages> 10万消息堆积
rabbitmq_queue_consumers= 0无消费者
rabbitmq_node_mem_used> 80%内存压力
rabbitmq_node_disk_free< 1GB磁盘告警
rabbitmq_connections> 1000连接数过多
rabbitmq_channel_messages_unacked> 1000未确认消息堆积

PaaS 中间件生态系统深度学习文档