消息队列,简单来讲,就是通过队列来存储与传递消息,具有解耦服务、削峰填谷、异步处理等优势。
由于业务发展的需要,引入消息队列后,确实能利用以上的优势,但同时提高了系统的复杂性,降低了可用性。
也会带来各种各样的问题,例如消息丢失、乱序与重复消费等,今天就简单讲讲如何保证消息不丢失。
如果队列只是去传递一些日志型的消息,那丢失也无所谓。但如果传递的是一些核心业务类型的消息,那就要保证消息不能丢失。
消息从生产到消费,要经历三个阶段,分别是生产、队列转发与消费,每个环节都可能丢失消息。
以下以RabbitMQ为例,来说明各个阶段会产生的问题以及解决方式。在说明之前,先回顾一下RabbitMQ的一个基本架构图
以上的架构图来自于RabbitMQ架构图和简介以及交换器模式
这种情况可以立即进行重试,但是一般也会失败。因为网络闪断的特性就是间歇性,较短时间内的重试大概率会失败。
这个时候,需要我们对发送失败的消息做出补偿。
生产端无法感知消息是否已经正确到达交换机上,无法采取下一步的动作,例如是删除消息,还是重新投递,这就需要mq在适当的时机对生产端进行通知。
mq提供了两种方式
(1)事务机制,属于同步方式,消息发送完之后会阻塞等待mq回应。在此期间无法发送下一条消息,严重降低吞吐量与性能。
(2)confirm确认机制,属于异步方式,消息发送完之后不需要阻塞等待。当消息达到指定的队列后,mq将会主动回传一个ack,代表消息入队成功。
因此,这里将会对当前channel开启confirm机制,来显示地告知消息的处理进度。
当mq接收到消息后,需要在落盘后通知生产端。如果不落盘就确认的话,mq一旦宕机,消息就会丢失,而生产端根本察觉不到。
想要在落盘后通知生产端,开启队列的confirm机制,即mq会对落盘后的消息进行异步ack。
同第1点,需要作出补偿机制。
综合以上3点,需要在网络闪断时,作出相应的补偿机制。
可以先利用本地消息表(mysql或者redis)记录消息状态,发送并落盘成功后,立即删除该消息记录。
对于那些处理失败的消息,再使用定时任务进行重新发送即可。
初步的设计方案如图:
1、生产端首先将业务数据以及消息数据入库,需要在同一个事务中,消息数据入库失败,则整体回滚。
2、假设生产端此时将a,b,c三条数据入库,他们的状态都为发送中。
3、mq收到了a消息,mq落盘后回传ack,生产端接收到了ack后,将消息库中的a删除(当然你可以将其状态置为发送成功,看业务需要了)
4、mq接着收到了b消息,但回传ack时,由于网络闪断一直未能让生产端接收到。此时定时任务会根据预设的超时时间扫描到发送超时或mq处理超时的消息,对其进行重试。重试成功后,生产端对其进行删除。
5、c消息就没有这么好运,由于其他原因,比如路由键设置错误、队列被误删除等,始终无法路由到对应的队列中,导致重试一直失败。在达到最大次数后,将会进行报警通知,后续由人工处理。
由于我们已经开启了confirm机制,这个时候mq会回传nack,代表处理失败。
对于这种问题,由以上的补偿方案可以解决。只要mq不回传ack,生产端就不删除消息。
开启交换机、队列与消息的持久化,三者缺一不可。消息刷盘后,再批量异步回传ack。
镜像部署mq,消息在所有或部分副本中写完再回传ack。
mq有以下三种部署方案:
单节点部署,消息只存在与当前节点。硬盘坏了,那消息真的就无法恢复了。
集群部署
(1)默认的集群部署,但消息只会存在与当前节点中,并不会同步到其他节点,其他节点也仅只会同步该节点的队列结构。
(2)镜像部署,消息会同步到其他节点上,可以设置同步的节点个数,但吞吐量会下降。
改为手动ack,当消息正确处理完成后,再通知mq。消费端处理消息异常后,回传nack,这样mq会把这条消息投递到另外一个消费端上。
mq会将超时未ack的消息重新放回队列。
生产端生产一条消息后,mq接收到该消息,先进行落库,再进行ack回传,生产端收到ack后,再去删除消息记录。
如果上述过程是一个同步过程的话,那整个吞吐量以及性能可太低了。
所以mq为了提高效率,会等到消息在内存中达到一定数量的时候,统一进行落盘,再回传ack。(这种模式和Redis中NO策略下的AOF持久化,以及Mysql中的redolog刷盘很类似)。
当然生产端也不是傻乎乎地一直等待,而是往mq中投递一个消息后,设置对当前队列或者channel的一个监听器,在异步的回调方法中进行ack与nack的处理。
在生产端长时间未收到ack或者nack的情况下,定时任务会该消息进行重试,因此会往队列中投递重复的消息,这时候就需要消费端保证幂等性。
在消费端拿到一个消息时,可以将消息中的业务参数组合成为一个key,利用数据库唯一索引或者redis来判断是否之前是否执行过。
如果需要保证消息在整条链路中不丢失,那就需要生产端、mq自身与消费端共同去保障。
生产端:对生产的消息进行状态标记,开启confirm机制,依据mq的响应来更新消息状态,使用定时任务重新投递超时的消息,多次投递失败进行报警。
mq自身:开启持久化,并在落盘后再进行ack。如果是镜像部署模式,需要在同步到多个副本之后再进行ack。
消费端:开启手动ack模式,在业务处理完成后再进行ack,并且需要保证幂等。
通过以上的处理,理论上不存在消息丢失的情况,但是系统的吞吐量以及性能有所下降。
在实际开发中,需要考虑消息丢失的影响程度,来做出对可靠性以及性能之间的权衡。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/qq_33591903/article/details/120383336
内容来源于网络,如有侵权,请联系作者删除!