一,消息的可靠性
- 客户端代码中的异常捕获,包括生产者和消费者
消息发送过程通过try catch 方式捕获异常, 在异常处理理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式, 并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试
- AMQP/RabbitMQ的事务机制
没有捕获到异常并不能代表消息就一定投递成功了。 一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是,这种方式在性能方面的开销 比较大,一般也不推荐使用。
- 发送端确认机制
RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派 一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么 确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 ID),这样生产者就知道消息已经正确送达了。
waitForConfirm方法有个重载的,可以自定义timeout超时时间,超时后会抛 TimeoutException。
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Publisher Confirms
channel.confirmSelect();
channel.exchangeDeclare(EX_PUBLISHER_CONFIRMS, BuiltinExchangeType.DIRECT);
channel.queueDeclare(QUEUE_PUBLISHER_CONFIRMS, false, false, false, null);
channel.queueBind(QUEUE_PUBLISHER_CONFIRMS, EX_PUBLISHER_CONFIRMS,
QUEUE_PUBLISHER_CONFIRMS);
String message = "hello";
channel.basicPublish(EX_PUBLISHER_CONFIRMS, QUEUE_PUBLISHER_CONFIRMS, null,
message.getBytes());
try {
channel.waitForConfirmsOrDie(5_000);
System.out.println("消息被确认:message = " + message);
} catch (IOException e) {
e.printStackTrace();
System.err.println("消息被拒绝! message = " + message);
} catch (InterruptedException e) {
e.printStackTrace();
System.err.println("在不是Publisher Confirms的通道上使用该方法");
} catch (TimeoutException e) {
e.printStackTrace();
System.err.println("等待消息确认超时! message = " + message);
}
- 消息持久化机制
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机 等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
- Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不不丢失。
- Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不 丢失。
- 消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2 即可实现消息的持久化,保证消息自身不丢失。
RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:
- 队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应的rabbit_queue_index。
- 消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列享,在每个节点中有且只有一个。
- 5. Broker端的高可用集群
- 6. 消费者确认机制
一般而言,有如下处理手段:
- 采用NONE模式,消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表, 再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险
- 采用AUTO(自动Ack)模式,不主动捕获异常,当消费过程中出现异常时会将消息放回 Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新 被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
- 采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回 Ack
/**
* NONE模式,则只要收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险
* AUTO模式,看情况确认,如果此时消费者抛出异常则消息会返回到队列中
* MANUAL模式,需要显式的调用当前channel的basicAck方法
* @param channel
* @param deliveryTag
* @param message
*/
@RabbitListener(queues = "lagou.topic.queue", ackMode = "AUTO")
public void handleMessageTopic(Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, @Payload byte[] message) {
System.out.println("RabbitListener消费消息,消息内容:" + new String((message)));
try {
// 手动ack,deliveryTag表示消息的唯一标志,multiple表示是否是批量确认
channel.basicAck(deliveryTag, false);
// 手动nack,告诉broker消费者处理失败,最后一个参数表示是否需要将消息重新入列
channel.basicNack(deliveryTag, false, true);
// 手动拒绝消息。第二个参数表示是否重新入列
channel.basicReject(deliveryTag, true);
} catch (IOException e) {
e.printStackTrace();
}
}
SpringBoot相关配置:
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts=5
#是否开启消费者重试(为false时关闭消费者重试,意思不是“不重试”,而是一直收到消息直到jack
确认或者一直到超时)
spring.rabbitmq.listener.simple.retry.enabled=true
#重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.retry.initial-interval=5000
# 重试超过最大次数后是否拒绝
spring.rabbitmq.listener.simple.default-requeue-rejected=false
#ack模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 7. 消费端限流
通常存在以下解决方案:
1,RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直 到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内 存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已 连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和 管理插件中显示为“已阻止”,这意味着它们尚未尝试发布,因此可以继续或被阻止,这意味着 它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。
2,基于credit flow 的流控机制,面向每一个连接进行流控。当单 个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个链接的流控可 能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到
3,RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被Ack的消息数量,如果 超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量 消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是 QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。
- 8. 消息幂等性
一般消息中间件的消息传输保障 分为三个层级:
- At most once:最多一次。消息可能会丢失,但绝不会重复传输
- At least once:最少一次。消息绝不会丢失,但可能会重复传输
- Exactly once:恰好一次。每条消息肯定会被传输一次且仅传输一次
其中“最少一次”投递实现需要考虑以下这个几个方面的内容:
- 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到 RabbitMQ 中。
- 消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队 列中,进而能够保存下来而不会被丢弃。
- 消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息 丢失。
- 消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确认已经 正确消费的消息,以避免在消费端引起不必要的消息丢失。
如果一个函数f(x) 满足:f(f(x)) = f(x),则函数f(x) 满足幂等性。也就是说,消息就算拿到重复,但是不会影响最后的结果。
二,问题的排查,可靠性分析
RabbitMQ 中可以使用Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费 消息的记录,方便RabbitMQ 的使用者进行调试、排错等。
Firehose 的原理是将生产者投递给RabbitMQ 的消息,或者RabbitMQ 投递给消费者的消息按照指 定的格式发送到默认的交换器上。这个默认的交换器的名称为 amq.rabbitmq.trace ,它是一个 topic 类型的交换器。发送到这个交换器上的消息的路由键为 publish.{exchangename} 和 deliver. {queuename} 。其中 exchangename 和 queuename 为交换器和队列的名称,分别对应生产者投递到交 换器的消息和消费者从队列中获取的消息。
开启Firehose命令:rabbitmqctl trace_on [-p vhost]
对应的关闭命令为:rabbitmqctl trace_off [-p vhost]
三,TTL机制
应用场景:在APP下单,订单创建成功,等待支付,一般会给30分钟的时间,开始倒计时。如果在这段时间内 用户没有支付,则默认订单取消
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种 过期的机制来做兜底。
目前有两种方法可以设置消息的TTL:
- 1. 通过Queue属性设置,队列中所有消息都有相同的过期时间。
- 2. 对消息自身进行单独设置,每条消息的TTL 可以不同。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存 时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当 然,“死信”也是可以被取出来消费的
Map<String, Object> arguments = new HashMap<>();
// 设置队列的TTL
arguments.put("x-message-ttl", 30000);
// 设置队列的空闲存活时间(如该队列根本没有消费者,一直没有使用,队列可以存活多久)
arguments.put("x-expires", 10000);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
四,死信队列
在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消 息就会被发送到该死信队列上,方便查看消息失败的原因。
DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter) 之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。
以下几种情况导致消息变为死信:
- 1. 消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
- 2. 消息过期;
- 3. 队列达到最大长度。
对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被 消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
Map<String, Object> arguments = new HashMap<>();
// 设置队列TTL
arguments.put("x-message-ttl", 10000);
// 设置该队列所关联的死信交换器(当队列消息TTL到期后依然没有消费,则加入死信队列)
arguments.put("x-dead-letter-exchange", "exchange.dlx");
// 设置该队列所关联的死信交换器的routingKey,如果没有特殊指定,使用原队列的routingKey
arguments.put("x-dead-letter-routing-key", "routing.key.dlx.test");
channel.queueDeclare("queue.biz", true, false, false, arguments)
五,延迟队列
延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消 费。
1,可以通过设值TTL超时之后插入死信队列,然后通过消费死信队列来实现延迟消费。(但是因为消息队列消息过期问题时候,会去先判断前面的部分去删除可能会导致数据丢失)
2,使用rabbitmq_delayed_message_exchange插件实现。
- 1. 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
- 2. 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列 (queue)并把消息给它
- 3. 队列(queue)再把消息发送给监听它的消费者(customer)
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases