RabbitMQ如何在各个环节保证消息不丢失

x33g5p2x  于2021-09-21 转载在 RabbitMQ  
字(2.6k)|赞(0)|评价(0)|浏览(383)

一、前言

消息队列,简单来讲,就是通过队列来存储与传递消息,具有解耦服务、削峰填谷、异步处理等优势。

由于业务发展的需要,引入消息队列后,确实能利用以上的优势,但同时提高了系统的复杂性,降低了可用性。

也会带来各种各样的问题,例如消息丢失、乱序与重复消费等,今天就简单讲讲如何保证消息不丢失。

如果队列只是去传递一些日志型的消息,那丢失也无所谓。但如果传递的是一些核心业务类型的消息,那就要保证消息不能丢失。

消息从生产到消费,要经历三个阶段,分别是生产、队列转发与消费,每个环节都可能丢失消息。

以下以RabbitMQ为例,来说明各个阶段会产生的问题以及解决方式。在说明之前,先回顾一下RabbitMQ的一个基本架构图

 以上的架构图来自于RabbitMQ架构图和简介以及交换器模式


二、生产端投递到队列过程中可能丢失消息

1、生产端发送消息时,由于网络闪断原因,消息未到达mq

这种情况可以立即进行重试,但是一般也会失败。因为网络闪断的特性就是间歇性,较短时间内的重试大概率会失败。

这个时候,需要我们对发送失败的消息做出补偿。

2、生产端发送消息成功,mq也接收到了消息,刚准备处理时,mq宕机。

生产端无法感知消息是否已经正确到达交换机上,无法采取下一步的动作,例如是删除消息,还是重新投递,这就需要mq在适当的时机对生产端进行通知。

mq提供了两种方式

(1)事务机制,属于同步方式,消息发送完之后会阻塞等待mq回应。在此期间无法发送下一条消息,严重降低吞吐量与性能。

(2)confirm确认机制,属于异步方式,消息发送完之后不需要阻塞等待。当消息达到指定的队列后,mq将会主动回传一个ack,代表消息入队成功。

因此,这里将会对当前channel开启confirm机制,来显示地告知消息的处理进度。

3、mq接收到消息,还没落盘就ack

当mq接收到消息后,需要在落盘后通知生产端。如果不落盘就确认的话,mq一旦宕机,消息就会丢失,而生产端根本察觉不到。

想要在落盘后通知生产端,开启队列的confirm机制,即mq会对落盘后的消息进行异步ack。

4、mq落盘后ack,但由于网络闪断,生产端未收到ack

同第1点,需要作出补偿机制。

综合以上3点,需要在网络闪断时,作出相应的补偿机制。

可以先利用本地消息表(mysql或者redis)记录消息状态,发送并落盘成功后,立即删除该消息记录。

对于那些处理失败的消息,再使用定时任务进行重新发送即可。

初步的设计方案如图:

1、生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。

2、假设生产端此时将a,b,c三条数据入库,他们的状态都为发送中。

3、mq收到了a消息,mq落盘后回传ack,生产端接收到了ack后,将消息库中的a删除(当然你可以将其状态置为发送成功,看业务需要了)

4、mq接着收到了b消息,但回传ack时,由于网络闪断一直未能让生产端接收到。此时定时任务会根据预设的超时时间扫描到发送超时或mq处理超时的消息,对其进行重试。重试成功后,生产端对其进行删除。

5、c消息就没有这么好运,由于其他原因,比如路由键设置错误、队列被误删除等,始终无法路由到对应的队列中,导致重试一直失败。在达到最大次数后,将会进行报警通知,后续由人工处理。


 三、队列本身可能丢失消息

1、消息达到mq,但mq中出现内部错误,无法处理该消息

由于我们已经开启了confirm机制,这个时候mq会回传nack,代表处理失败。

对于这种问题,由以上的补偿方案可以解决。只要mq不回传ack,生产端就不删除消息。

2、消息还没来得及刷盘,mq就宕机了,重启后,消息丢失。

开启交换机、队列与消息的持久化,三者缺一不可。消息刷盘后,再批量异步回传ack。

3、开启持久化后,但是硬盘坏了,无法恢复数据。

镜像部署mq,消息在所有或部分副本中写完再回传ack。

mq有以下三种部署方案:

单节点部署,消息只存在与当前节点。硬盘坏了,那消息真的就无法恢复了。

集群部署

(1)默认的集群部署,但消息只会存在与当前节点中,并不会同步到其他节点,其他节点也仅只会同步该节点的队列结构。

(2)镜像部署,消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。


四、消费端可能丢失消息

1、消费端采用自动ack机制,还没有处理完毕,消费端宕机。

改为手动ack,当消息正确处理完成后,再通知mq。消费端处理消息异常后,回传nack,这样mq会把这条消息投递到另外一个消费端上。

2、消费端处理完消息后,回传ack时发生网络闪断,mq未收到ack。

mq会将超时未ack的消息重新放回队列。


五、注意点

1、mq的ack回传是批量异步的方式,生产端对ack的监听也是异步的

生产端生产一条消息后,mq接收到该消息,先进行落库,再进行ack回传,生产端收到ack后,再去删除消息记录。

如果上述过程是一个同步过程的话,那整个吞吐量以及性能可太低了。

所以mq为了提高效率,会等到消息在内存中达到一定数量的时候,统一进行落盘,再回传ack。(这种模式和Redis中NO策略下的AOF持久化,以及Mysql中的redolog刷盘很类似)。

当然生产端也不是傻乎乎地一直等待,而是往mq中投递一个消息后,设置对当前队列或者channel的一个监听器,在异步的回调方法中进行ack与nack的处理。

2、在重试的补偿机制下,消费端需要保证幂等。

在生产端长时间未收到ack或者nack的情况下,定时任务会该消息进行重试,因此会往队列中投递重复的消息,这时候就需要消费端保证幂等性。

在消费端拿到一个消息时,可以将消息中的业务参数组合成为一个key,利用数据库唯一索引或者redis来判断是否之前是否执行过。


六、总结

如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。

生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。

mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。

消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。

通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。

在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。

相关文章