RabbitMQ — AMQP 消息代理
核心架构
RabbitMQ 基于 AMQP 0-9-1 协议,采用 Broker 中心化架构。消息不直接发送到队列,而是先到 Exchange,由 Exchange 根据路由规则分发到绑定的队列。
Producer
└──► Exchange ──(Binding + Routing Key)──► Queue ──► Consumer
↑
Exchange Types:
direct / fanout / topic / headers核心组件
| 组件 | 说明 |
|---|---|
| Producer | 消息生产者,发布消息到 Exchange |
| Exchange | 消息路由器,不存储消息 |
| Binding | Exchange 与 Queue 的绑定关系 |
| Queue | 消息存储队列,消费者从此消费 |
| Consumer | 消息消费者 |
| Virtual Host | 逻辑隔离单元,类似命名空间 |
| Channel | TCP 连接内的虚拟连接,复用连接 |
Exchange 类型详解
Direct Exchange:精确匹配 Routing Key
Exchange(direct) ──[routing_key=order.created]──► Queue: order-queue
──[routing_key=user.created]───► Queue: user-queueFanout Exchange:广播,忽略 Routing Key
Exchange(fanout) ──► Queue: service-A
──► Queue: service-B
──► Queue: service-CTopic Exchange:通配符匹配(* 匹配一个词,# 匹配零或多个词)
Exchange(topic)
──[order.*]──────► Queue: order-all
──[order.created]► Queue: order-new
──[#.error]──────► Queue: all-errorsHeaders Exchange:基于消息头属性匹配,性能较差,少用。
消息可靠性保障
生产者确认(Publisher Confirms)
java
// 开启 Confirm 模式
channel.confirmSelect();
// 发送消息
channel.basicPublish(exchange, routingKey, props, body);
// 同步等待确认(简单但阻塞)
channel.waitForConfirmsOrDie(5000);
// 异步确认(推荐)
channel.addConfirmListener(
(deliveryTag, multiple) -> log.info("ACK: {}", deliveryTag),
(deliveryTag, multiple) -> log.warn("NACK: {}", deliveryTag)
);消费者确认(Consumer ACK)
java
// 手动 ACK 模式(autoAck=false)
channel.basicConsume(queueName, false, (tag, delivery) -> {
try {
process(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
// requeue=true 重新入队,false 丢弃或进死信队列
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
}
}, tag -> {});消息持久化
java
// 队列持久化
channel.queueDeclare("my-queue",
true, // durable=true
false, // exclusive
false, // autoDelete
null);
// 消息持久化
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.deliveryMode(2) // 2=持久化
.build();
channel.basicPublish(exchange, routingKey, props, body);⚠️ 持久化不能 100% 保证不丢失,消息写入磁盘前 Broker 崩溃仍会丢失。需配合 Publisher Confirms 使用。
死信队列(DLX)
消息成为死信的三种情况:
- 消息被 NACK 且 requeue=false
- 消息 TTL 过期
- 队列达到最大长度
java
// 声明死信队列
channel.queueDeclare("dlx-queue", true, false, false, null);
channel.exchangeDeclare("dlx-exchange", "direct");
channel.queueBind("dlx-queue", "dlx-exchange", "dlx-key");
// 业务队列绑定死信交换机
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx-exchange");
args.put("x-dead-letter-routing-key", "dlx-key");
args.put("x-message-ttl", 30000); // 消息 TTL 30s
args.put("x-max-length", 10000); // 最大队列长度
channel.queueDeclare("business-queue", true, false, false, args);延迟消息
RabbitMQ 原生不支持延迟消息,有两种实现方式:
方式一:TTL + 死信队列
Producer ──► delay-queue(TTL=10s, DLX=real-exchange) ──[过期]──► real-queue ──► Consumer缺点:队列头部消息未过期会阻塞后续消息(即使后续消息已过期)。
方式二:rabbitmq-delayed-message-exchange 插件
bash
# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchangejava
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("delayed-exchange", "x-delayed-message", true, false, args);
// 发送延迟消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
.headers(Map.of("x-delay", 10000)) // 延迟 10s
.build();
channel.basicPublish("delayed-exchange", routingKey, props, body);集群架构
普通集群(Classic Cluster)
Node1 (Queue Master) ←──── Node2 (Queue Mirror)
↑ ↑
Consumer Consumer
(直接消费) (转发到 Master)- 队列数据只存在于 Master 节点
- 其他节点只存储元数据,消费时转发请求
- Master 宕机,队列不可用(除非开启镜像)
镜像队列(Mirrored Queue)
java
// 通过 Policy 设置镜像
rabbitmqctl set_policy ha-all "^ha\." \
'{"ha-mode":"all","ha-sync-mode":"automatic"}'- 所有节点都有完整队列副本
- 性能开销大,写入需同步到所有镜像
- RabbitMQ 3.8+ 推荐使用 Quorum Queue 替代
Quorum Queue(仲裁队列)
基于 Raft 协议,RabbitMQ 3.8+ 推荐的高可用方案:
java
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("my-quorum-queue", true, false, false, args);优势:
- Raft 协议保证强一致性
- 自动 Leader 选举
- 更好的数据安全性
最佳实践
连接与 Channel 管理
java
// ✅ 正确:连接复用,每个线程独立 Channel
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection(); // 应用级别单例
// 每个线程/请求创建独立 Channel
Channel channel = connection.createChannel();
// ... 使用 channel
channel.close(); // 用完关闭java
// ❌ 错误:每次操作创建新连接(TCP 开销巨大)
Connection conn = factory.newConnection();
Channel ch = conn.createChannel();
ch.basicPublish(...);
conn.close();消费者预取(QoS)
java
// 每次最多推送 10 条未确认消息,防止消费者被压垮
channel.basicQos(10);
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);消息幂等性
消费者必须实现幂等处理,因为网络异常可能导致消息重复投递:
java
void processMessage(String messageId, byte[] body) {
// 检查是否已处理(Redis / DB 去重)
if (idempotencyStore.exists(messageId)) {
return;
}
// 业务处理
doProcess(body);
// 记录已处理
idempotencyStore.set(messageId, "1", Duration.ofDays(7));
}故障处理案例
案例一:消息堆积导致内存告警
现象:RabbitMQ 内存使用率超过 vm_memory_high_watermark(默认 40%),触发 Flow Control,生产者被阻塞。
排查步骤:
bash
# 查看队列状态
rabbitmqctl list_queues name messages consumers memory
# 查看连接状态
rabbitmqctl list_connections name state recv_oct send_oct
# 查看内存使用
rabbitmqctl status | grep memory解决方案:
- 临时:增加消费者实例,加速消费
- 临时:
rabbitmqctl purge_queue <queue>清空非重要队列 - 根本:优化消费者处理速度,设置合理的
x-max-length和x-overflow: reject-publish
案例二:消息重复消费
现象:消费者处理完消息后,ACK 前网络断开,消息重新入队被再次消费。
解决方案:
java
// 方案:数据库唯一约束 + 幂等处理
@Transactional
public void handleOrder(OrderMessage msg) {
// 利用数据库唯一索引防重
try {
orderService.createOrder(msg.getOrderId(), msg.getData());
} catch (DuplicateKeyException e) {
// 已处理,直接 ACK
log.warn("Duplicate message: {}", msg.getOrderId());
}
}案例三:脑裂(Network Partition)
现象:集群节点间网络分区,各自认为对方宕机,出现双 Master。
配置建议:
ini
# rabbitmq.conf
# 分区处理策略:pause-minority(少数派暂停服务)
cluster_partition_handling = pause-minority| 策略 | 说明 | 适用场景 |
|---|---|---|
ignore | 忽略分区(默认) | 不推荐 |
pause-minority | 少数派节点暂停 | 奇数节点集群 |
pause-if-all-down | 所有节点都断才暂停 | 特定场景 |
autoheal | 自动选择胜者 | 可接受数据丢失 |
监控指标
| 指标 | 告警阈值 | 说明 |
|---|---|---|
rabbitmq_queue_messages | > 10万 | 消息堆积 |
rabbitmq_queue_consumers | = 0 | 无消费者 |
rabbitmq_node_mem_used | > 80% | 内存压力 |
rabbitmq_node_disk_free | < 1GB | 磁盘告警 |
rabbitmq_connections | > 1000 | 连接数过多 |
rabbitmq_channel_messages_unacked | > 1000 | 未确认消息堆积 |