一.MQ的相关概念
什么是MQ?
为什么要用 MQ?
MQ的选择?
**
**
prodeucer:生产者,用来发送消息
Consumer:消费者,用来处理消息
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每次访问MQ都建立Connection是很浪费性能的,Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
每次连接都会创建一个Channel,通过Channel进行一系列操作(创建queue,发消息)
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker,就是Rabbit的实例。
Exchange(交换机):message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。
Queue(队列):消息最终被送到这里等待 consumer 取走
Exchange说明:
类型:
死信队列:
顾名思义就是无法被消费的消息(由于特定的原因导致 queue 中的某些消息无法被消费)。
解释图:设置死信队列和死信交换机,过期的消息发送给死信交换机,死信交换机再发给死信队列,由特定的消费者进行处理。
TTL:消息/队列最大的存活时间。
每一个消息可以单独设置TTL,每一个队列也可以设置TTL。
延迟队列:
延迟队列是死信队列的一种形式。看上图,如果我们不设置C1,那么所有的消息只能在队列中等待到过期,进入死信队列,马上被消费,相当于延迟消费。
使用场景:
订单在十分钟之内未支付则自动取消
用户发起退款,如果三天内没有得到处理则通知相关运营人员。
预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议...
存在问题:不同的延迟时长,就需要增加不同的延迟队列。如下图:如果再增加一个1小时后执行的事务,就需要再加一个1小时的延迟队列。
进阶1:使用一个延迟队列,不要设置队列的TTL,设置每一个消息的TTL。这样不同的消息就会在不同的时间点后进行消费。
新问题:RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
进阶2:使用延迟插件可以解决这个问题。
优先队列:队列者设置为优先级队列,发消息的时候也设置优先级消息。这样才能保证真正的优先
镜像队列:在Rabbit集群中,如何保证队列同步到其他的节点,就使用镜像队列。实现:随便找一个节点添加一个策略policy就可以了。
四.消息可靠性怎么保证?
要保证消息不丢失,需要三方面都进行保证:生产者(发布确认机制,事务机制),消费者(消息应答机制,死信队列),MQ(持久化,集群)
发布确认:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了。如果超时没有收到消息,或者收到未收到的消息,进行重发或者警报管理员。
单个发布确认:是一种同步确认发布的方式,发布一个消息之后只有它被确认发布,后续的消息才能继续发布。channel.waitForConfirms()
缺点:效率低
批量发布确认:也是一种同步确认发布的方式,先发布一批消息然后一起确认,可以极大地提高吞吐量
缺点:一旦出现问题但很难推断出是那 消息出现了问题。
异步发布确认:生产者只负责不断的发就行,确认的方面由MQ自己决定,收到未收到都会回复一个消息(回调函数)。不会造成生产者等待。用下图的俩个回调方法实现
每一个Channel中的消息都有自己的编号,MQ只回复对应的编号就行
事务机制:RabbitMQ 客户端中与事务机制相关的方法有三个
channel.txSelect
用于将当前的信道设置成事务模式。channel.txCommit
用于提交事务 。channel.txRollback
用于事务回滚,如果在事务提交执行之前由于 RabbitMQ 异常崩溃或者其他原因抛出异常,通过txRollback来回滚。消息应答:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。如果没收到ack,消息自动重新入队。
自动应答:默认的,消费者一收到消息就马上进行应答。
缺点:可能消费者后续处理出错,就没有办法了。
手动应答:可以在消费者处理完逻辑之后,再进行应答,就保证了消息的可靠。实现:Channel.basicAck()
手动应答还可以设置批量应答:比如发送消息1,2,3,4,5。批量应答是只要收到消息5,只应答5,前面的几个消息就不需要应答了。
持久化:需要将队列和消息都进行持久化。
刷盘的实现:刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘
同步刷盘:在Broker把消息写到CommitLog映射区后,就会等待写入完成。
异步刷盘:只是唤醒对应的线程,不保证执行的时机,流程如图所示。
五.如何解决消息重复,消息积压,消息有序性?
消息重复:采用幂等性接口。
消息积压:
因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。
解决方案:
定位消费慢的原因,如果有bug,调bug。或者进行数据库层面的调优。
多增加几个队列和消费者。
消息有序性
有一些消息需要保证顺序,比如下订单,要先锁库存,才能生成订单信息
解决方案:
使用CompletableFuture工具类进行异步编排。
全局有序:只能由一个生产者往Broker发送消息,并且一个Broker内部只能有一个队列。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!
部分有序:消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。
六**.其他小问题**
vhost 是什么?起什么作用?
vhost
可以理解为虚拟 broker
,即 mini-RabbitMQ server
。queue
、exchange
和 binding
等,但最最重要的是,其拥有独立的权限系统,可以做到 vhost
范围的用户控制。RabbitMQ
的全局角度,vhost
可以作为不同权限隔离的手段(一个典型的例子就是不同的应用可以跑在不同的 vhost
中)。消息基于什么传输?
消息如何分发?
什么是元数据?元数据分为哪些类型?包括哪些内容?与 cluster 相关的元数据有哪些?元数据是如何保存的?元数据在 cluster 中是如何分布的?
cluster
模式下,元数据主要分为 Queue
元数据(queue 名字和属性等)、Exchange
元数据(exchange 名字、类型和属性等)、Binding
元数据(存放路由关系的查找表)、Vhost
元数据(vhost 范围内针对前三者的名字空间约束和安全属性设置)。cluster
模式下,还包括 cluster 中node 位置信息和 node 关系信息。RabbitMQ的集群模式和集群节点类型?
寄语:年轻人,你的职责是平整土地,而非焦虑时光,你做三四月的事,在七八月自有答案 ---余世存《时间之书》
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://www.cnblogs.com/monkey-xuan/p/15865509.html
内容来源于网络,如有侵权,请联系作者删除!