2022 RocketMQ精选面试题50道

x33g5p2x  于2023-01-13 发布在 其他  
字(22.5k)|赞(0)|评价(0)|浏览(4325)

1、 多个mq如何选型?

MQ描述
RabbitMQerlang开发,对消息堆积的支持并不好,当大量消息积压的时候,会导致 RabbitMQ 的性能急剧下降。每秒钟可以处理几万到十几万条消息。
RocketMQjava开发,面向互联网集群化功能丰富,对在线业务的响应时延做了很多的优化,大多数情况下可以做到毫秒级的响应,每秒钟大概能处理几十万条消息。
KafkaScala开发,面向日志功能丰富,性能最高。当你的业务场景中,每秒钟消息数量没有那么多的时候,Kafka 的时延反而会比较高。所以,Kafka 不太适合在线业务场景。
ActiveMQjava开发,简单,稳定,性能不如前面三个。小型系统用也ok,但是不推荐。推荐用互联网主流的。

2、 为什么要使用MQ?

因为项目比较大,做了分布式系统,所有远程服务调用请求都是同步执行经常出问题,所以引入了mq

作用描述
解耦系统耦合度降低,没有强依赖关系
异步不需要同步执行的远程调用可以有效提高响应时间
削峰请求达到峰值后,后端service还可以保持固定消费速率消费,不会被压垮

3、 RocketMQ由哪些角色组成,每个角色作用和特点是什么?

角色作用
Nameserver无状态,动态列表;这也是和zookeeper的重要区别之一。zookeeper是有状态的。
Producer消息生产者,负责发消息到Broker。
Broker就是MQ本身,负责收发消息、持久化消息等。
Consumer消息消费者,负责从Broker上拉取消息进行消费,消费完进行ack。

4、 RocketMQ中的Topic和JMS的queue有什么区别?

queue就是来源于数据结构的FIFO队列。而Topic是个抽象的概念,每个Topic底层对应N个queue,而数据也真实存在queue上的。

5、 RocketMQ Broker中的消息被消费后会立即删除吗?

不会,每条消息都会持久化到CommitLog中,每个Consumer连接到Broker后会维持消费进度信息,当有消息消费后只是当前Consumer的消费进度(CommitLog的offset)更新了。

追问:那么消息会堆积吗?什么时候清理过期消息?

4.6版本默认48小时后会删除不再使用的CommitLog文件

  • 检查这个文件最后访问时间
  • 判断是否大于过期时间
  • 指定时间删除,默认凌晨4点

源码如下:

/*** {@linkorg.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#isTimeToDelete()}
 */
private booleanisTimeToDelete() {
    //when = "04";
    String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
    //是04点,就返回true
    if(UtilAll.isItTimeToDo(when)) {
        return true;
    }
 //不是04点,返回false
    return false;
}

/*** {@linkorg.apache.rocketmq.store.DefaultMessageStore.CleanCommitLogService#deleteExpiredFiles()}
 */
private voiddeleteExpiredFiles() {
    //isTimeToDelete()这个方法是判断是不是凌晨四点,是的话就执行删除逻辑。
    if(isTimeToDelete()) {
        //默认是72,但是broker配置文件默认改成了48,所以新版本都是48。
        long fileReservedTime = 48 * 60 * 60 * 1000;
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(72 * 60 * 60 * 1000, xx, xx, xx);
    }
}
                                                                       
/*** {@linkorg.apache.rocketmq.store.CommitLog#deleteExpiredFile()}
 */
public intdeleteExpiredFile(xxx) {
    //这个方法的主逻辑就是遍历查找最后更改时间+过期时间,小于当前系统时间的话就删了(也就是小于48小时)。
    return this.mappedFileQueue.deleteExpiredFileByTime(72 * 60 * 60 * 1000, xx, xx, xx);
}

6、 RocketMQ消费模式有几种?

消费模型由Consumer决定,消费维度为Topic。

  • 集群消费
    1.一条消息只会被同Group中的一个Consumer消费

2.多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据

  • 广播消费
    消息将对一 个Consumer Group 下的各个 Consumer 实例都消费一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

7、 消费消息是push还是pull?

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启

源码如下:

//{@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
//看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。

//拉取消息,结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);

追问:为什么要主动拉取消息而不使用事件监听方式?

事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。

8、 broker如何处理拉取请求的?

Consumer首次请求Broker

  • Broker中是否有符合条件的消息

  • 有 ->

  • 响应Consumer

  • 等待下次Consumer的请求

  • 没有

  • DefaultMessageStore#ReputMessageService#run方法

  • PullRequestHoldService 来Hold连接,每个5s执行一次检查pullRequestTable有没有消息,有的话立即推送

  • 每隔1ms检查commitLog中是否有新消息,有的话写入到pullRequestTable

  • 当有新消息的时候返回请求

  • 挂起consumer的请求,即不断开连接,也不返回数据

  • 使用consumer的offset,

9、 RocketMQ如何做负载均衡?

通过Topic在多Broker中分布式存储实现。

producer端

发送端指定message queue发送消息到相应的broker,来达到写入时的负载均衡

  • 提升写入吞吐量,当多个producer同时向一个broker写入数据的时候,性能会下降
  • 消息分布在多broker中,为负载消费做准备

默认策略是随机选择:

  • producer维护一个index
  • 每次取节点会自增
  • index向所有broker个数取余
  • 自带容错策略

其他实现:

  • SelectMessageQueueByHash

  • hash的是传入的args

  • SelectMessageQueueByRandom

  • SelectMessageQueueByMachineRoom 没有实现

也可以自定义实现MessageQueueSelector接口中的select方法

MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);

consumer端

采用的是平均分配算法来进行负载均衡。

其他负载均衡算法

平均分配策略(默认)(AllocateMessageQueueAveragely) 环形分配策略(AllocateMessageQueueAveragelyByCircle) 手动配置分配策略(AllocateMessageQueueByConfig) 机房分配策略(AllocateMessageQueueByMachineRoom) 一致性哈希分配策略(AllocateMessageQueueConsistentHash) 靠近机房策略(AllocateMachineRoomNearby)

追问:当消费负载均衡consumer和queue不对等的时候会发生什么?

Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况,如果Consumer等于queue的个数,那就是一个Consumer消费一个queue,如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。

10、 消息重复消费

影响消息正常发送和消费的重要原因是网络的不确定性。

引起重复消费的原因

  • ACK

正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除

当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer

  • 消费模式

在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次

解决方案

  • 数据库表

处理消息前,使用消息主键在表中带有约束的字段中insert

  • Map

单机时可以使用map ConcurrentHashMap -> putIfAbsent   guava cache

  • Redis

分布式锁搞起来。

11、 如何让RocketMQ保证消息的顺序消费

你们线上业务用消息中间件的时候,是否需要保证消息的顺序性?

如果不需要保证消息顺序,为什么不需要?假如我有一个场景要保证消息的顺序,你们应该如何保证?

首先多个queue只能保证单个queue里的顺序,queue是典型的FIFO,天然顺序。多个queue同时消费是无法绝对保证消息的有序性的。所以总结如下:

同一topic,同一个QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个queue里的消息。

追问:怎么保证消息发到同一个queue?

Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断 i % 2 == 0 ,那就都放到queue1里,否则放到queue2里。

for (int i = 0; i < 5; i++) {
    Message message = new Message("orderTopic", ("hello!" +i).getBytes());
    producer.send(
        //要发的那条消息
message,
        //queue 选择器 ,向 topic中的哪个queue去写消息
        newMessageQueueSelector() {
            //手动 选择一个queue
@Override
            publicMessageQueue select(
                //当前topic 里面包含的所有queue
                List<MessageQueue>mqs,
                //具体要发的那条消息
Message msg,
                //对应到 send() 里的 args,也就是2000前面的那个0
Object arg) {
                //向固定的一个queue里写消息,比如这里就是向第一个queue里写消息
                if (Integer.parseInt(arg.toString()) % 2 == 0) {
                    return mqs.get(0);
                } else{
                    return mqs.get(1);
                }
            }
        },
        //自定义参数:0
        //2000代表2000毫秒超时时间
        i, 2000);
}

12、 RocketMQ如何保证消息不丢失

首先在如下三个部分都可能会出现丢失消息的情况:

  • Producer端
  • Broker端
  • Consumer端

Producer端如何保证消息不丢失

  • 采取send()同步发消息,发送结果是同步感知的。

  • 发送失败后可以重试,设置重试次数。默认3次。
    producer.setRetryTimesWhenSendFailed(10);

  • 集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。

Broker端如何保证消息不丢失

  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
    flushDiskType = SYNC_FLUSH

  • 集群部署,主从模式,高可用。

Consumer端如何保证消息不丢失

  • 完全消费正常后在进行手动ack确认。

13、 rocketMQ的消息堆积如何处理

下游消费系统如果宕机了,导致几百万条消息在消息中间件里积压,此时怎么处理?

你们线上是否遇到过消息积压的生产故障?如果没遇到过,你考虑一下如何应对?

首先要找到是什么原因导致的消息堆积,是Producer太多了,Consumer太少了导致的还是说其他情况,总之先定位问题。

然后看下消息消费速度是否正常,正常的话,可以通过上线更多consumer临时解决消息堆积问题

追问:如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?

  • 准备一个临时的topic
  • queue的数量是堆积的几倍
  • queue分布到多Broker中
  • 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去
  • 上线N台Consumer同时消费临时Topic中的数据
  • 改bug
  • 恢复原来的Consumer,继续消费之前的Topic

追问:堆积时间过长消息超时了?

RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。

追问:堆积的消息会不会进死信队列?

不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次(默认18次,网上所有文章都说是16次,无一例外。但是我没搞懂为啥是16次,这不是18个时间吗 ?)才会进入死信队列(%DLQ%+ConsumerGroup)。

源码如下:

public classMessageStoreConfig {
    //每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。
 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

14、 RocketMQ在分布式事务支持这块机制的底层原理?

你们用的是RocketMQ?RocketMQ很大的一个特点是对分布式事务的支持,你说说他在分布式事务支持这块机制的底层原理?

分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性

RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致

RocketMQ实现方式:

**Half Message:**预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中

**检查事务状态:**Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。

**超时:**如果超过回查次数,默认回滚消息。

也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。

15、 如果让你来动手实现一个分布式消息中间件,整体架构你会如何设计实现?

我个人觉得从以下几个点回答吧:

  • 需要考虑能快速扩容、天然支持集群
  • 持久化的姿势
  • 高可用性
  • 数据0丢失的考虑
  • 服务端部署简单、client端使用简单

16、 看过RocketMQ 的源码没有。如果看过,说说你对RocketMQ 源码的理解?

要真让我说,我会吐槽蛮烂的,首先没任何注释,可能是之前阿里巴巴写了中文注释,捐赠给apache后,apache觉得中文注释不能留,自己又懒得写英文注释,就都给删了。里面比较典型的设计模式有单例、工厂、策略、门面模式。单例工厂无处不在,策略印象深刻比如发消息和消费消息的时候queue的负载均衡就是N个策略算法类,有随机、hash等,这也是能够快速扩容天然支持集群的必要原因之一。持久化做的也比较完善,采取的CommitLog来落盘,同步异步两种方式。

17、 高吞吐量下如何优化生产者和消费者的性能?

开发

  • 同一group下,多机部署,并行消费

  • 单个Consumer提高消费线程个数

  • 批量消费

  • 消息批量拉取

  • 业务逻辑批量处理

运维

  • 网卡调优
  • jvm调优
  • 多线程与cpu调优
  • Page Cache

18、 再说说RocketMQ 是如何保证数据的高容错性的?

  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
  • 如果上次失败的Broker可用那么还是会选择该Broker的队列
  • 如果上述情况失败,则随机选择一个进行发送
  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间
    其实就是send消息的时候queue的选择。源码在如下:

org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue()

19、 任何一台Broker突然宕机了怎么办?

Broker主从架构以及多副本策略。Master收到消息后会同步给Slave,这样一条消息就不止一份了,Master宕机了还有slave中的消息可用,保证了MQ的可靠性和高可用性。而且Rocket MQ4.5.0开始就支持了Dlegder模式,基于raft的,做到了真正意义的HA。

20、 Broker把自己的信息注册到哪个NameServer上?

这么问明显在坑你,因为Broker会向所有的NameServer上注册自己的信息,而不是某一个,是每一个,全部!

21、 你们为什么使用mq?具体的使用场景是什么?

mq的作用很简单,削峰填谷。以电商交易下单的场景来说,正向交易的过程可能涉及到创建订单、扣减库存、扣减活动预算、扣减积分等等。每个接口的耗时如果是100ms,那么理论上整个下单的链路就需要耗费400ms,这个时间显然是太长了。

如果这些操作全部同步处理的话,首先调用链路太长影响接口性能,其次分布式事务的问题很难处理,这时候像扣减预算和积分这种对实时一致性要求没有那么高的请求,完全就可以通过mq异步的方式去处理了。同时,考虑到异步带来的不一致的问题,我们可以通过job去重试保证接口调用成功,而且一般公司都会有核对的平台,比如下单成功但是未扣减积分的这种问题可以通过核对作为兜底的处理方案。

使用mq之后我们的链路变简单了,同时异步发送消息我们的整个系统的抗压能力也上升了。

22、 那你们使用什么mq?基于什么做的选型?

我们主要调研了几个主流的mq,kafka、rabbitmq、rocketmq、activemq,选型我们主要基于以下几个点去考虑:

  1. 由于我们系统的qps压力比较大,所以性能是首要考虑的要素。
  2. 开发语言,由于我们的开发语言是java,主要是为了方便二次开发。
  3. 对于高并发的业务场景是必须的,所以需要支持分布式架构的设计。
  4. 功能全面,由于不同的业务场景,可能会用到顺序消息、事务消息等。

基于以上几个考虑,我们最终选择了RocketMQ。

 KafkaRocketMQRabbitMQActiveMQ
单机吞吐量10万级10万级万级万级
开发语言ScalaJavaErlangJava
高可用分布式架构分布式架构主从架构主从架构
性能ms级ms级us级ms级
功能只支持主要的MQ功能顺序消息、事务消息等功能完善并发强、性能好、延时低成熟的社区产品、文档丰富

23、 你上面提到异步发送,那消息可靠性怎么保证?

消息丢失可能发生在生产者发送消息、MQ本身丢失消息、消费者丢失消息3个方面。

生产者丢失

生产者丢失消息的可能点在于程序发送失败抛异常了没有重试处理,或者发送的过程成功但是过程中网络闪断MQ没收到,消息就丢失了。

由于同步发送的一般不会出现这样使用方式,所以我们就不考虑同步发送的问题,我们基于异步发送的场景来说。

异步发送分为两个方式:异步有回调和异步无回调,无回调的方式,生产者发送完后不管结果可能就会造成消息丢失,而通过异步发送+回调通知+本地消息表的形式我们就可以做出一个解决方案。以下单的场景举例。

  1. 下单后先保存本地数据和MQ消息表,这时候消息的状态是发送中,如果本地事务失败,那么下单失败,事务回滚。
  2. 下单成功,直接返回客户端成功,异步发送MQ消息
  3. MQ回调通知消息发送结果,对应更新数据库MQ发送状态
  4. JOB轮询超过一定时间(时间根据业务配置)还未发送成功的消息去重试
  5. 在监控平台配置或者JOB程序处理超过一定次数一直发送不成功的消息,告警,人工介入。

一般而言,对于大部分场景来说异步回调的形式就可以了,只有那种需要完全保证不能丢失消息的场景我们做一套完整的解决方案。

MQ丢失

如果生产者保证消息发送到MQ,而MQ收到消息后还在内存中,这时候宕机了又没来得及同步给从节点,就有可能导致消息丢失。

比如RocketMQ:

RocketMQ分为同步刷盘和异步刷盘两种方式,默认的是异步刷盘,就有可能导致消息还未刷到硬盘上就丢失了,可以通过设置为同步刷盘的方式来保证消息可靠性,这样即使MQ挂了,恢复的时候也可以从磁盘中去恢复消息。

比如Kafka也可以通过配置做到:

acks=all 只有参与复制的所有节点全部收到消息,才返回生产者成功。这样的话除非所有的节点都挂了,消息才会丢失。
replication.factor=N,设置大于1的数,这会要求每个partion至少有2个副本
min.insync.replicas=N,设置大于1的数,这会要求leader至少感知到一个follower还保持着连接
retries=N,设置一个非常大的值,让生产者发送失败一直重试

虽然我们可以通过配置的方式来达到MQ本身高可用的目的,但是都对性能有损耗,怎样配置需要根据业务做出权衡。

消费者丢失

消费者丢失消息的场景:消费者刚收到消息,此时服务器宕机,MQ认为消费者已经消费,不会重复发送消息,消息丢失。

RocketMQ默认是需要消费者回复ack确认,而kafka需要手动开启配置关闭自动offset。

消费方不返回ack确认,重发的机制根据MQ类型的不同发送时间间隔、次数都不尽相同,如果重试超过次数之后会进入死信队列,需要手工来处理了。(Kafka没有这些)

24、 你说到消费者消费失败的问题,那么如果一直消费失败导致消息积压怎么处理?

因为考虑到时消费者消费一直出错的问题,那么我们可以从以下几个角度来考虑:

  1. 消费者出错,肯定是程序或者其他问题导致的,如果容易修复,先把问题修复,让consumer恢复正常消费
  2. 如果时间来不及处理很麻烦,做转发处理,写一个临时的consumer消费方案,先把消息消费,然后再转发到一个新的topic和MQ资源,这个新的topic的机器资源单独申请,要能承载住当前积压的消息
  3. 处理完积压数据后,修复consumer,去消费新的MQ和现有的MQ数据,新MQ消费完成后恢复原状

25、 那如果消息积压达到磁盘上限,消息被删除了怎么办?

这。。。他妈都删除了我有啥办法啊。。。冷静,再想想。。有了。

最初,我们发送的消息记录是落库保存了的,而转发发送的数据也保存了,那么我们就可以通过这部分数据来找到丢失的那部分数据,再单独跑个脚本重发就可以了。如果转发的程序没有落库,那就和消费方的记录去做对比,只是过程会更艰难一点。

26、 说了这么多,那你说说RocketMQ实现原理吧?

RocketMQ由NameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干Broker(RocketMQ进程)组成,它的架构原理是这样的:

  1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
  2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费

27、 为什么RocketMQ不使用Zookeeper作为注册中心呢?

我认为有以下几个点是不使用zookeeper的原因:

  1. 根据CAP理论,同时最多只能满足两个点,而zookeeper满足的是CP,也就是说zookeeper并不能保证服务的可用性,zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑,NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而zookeeper的写是不可扩展的,而zookeeper要解决这个问题只能通过划分领域,划分多个zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题,ZooKeeper 的 ZAB 协议对每一个写请求,会在每个 ZooKeeper 节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该弱依赖注册中心,而RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

28、 那Broker是怎么保存数据的呢?

RocketMQ主要的存储文件包括commitlog文件、consumequeue文件、indexfile文件。

Broker在收到消息之后,会把消息保存到commitlog的文件当中,而同时在分布式的存储当中,每个broker都会保存一部分topic的数据,同时,每个topic对应的messagequeue下都会生成consumequeue文件用于保存commitlog的物理位置偏移量offset,indexfile中会保存key和offset的对应关系。

CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。

由于同一个topic的消息并不是连续的存储在commitlog中,消费者如果直接从commitlog获取消息效率非常低,所以通过consumequeue保存commitlog中消息的偏移量的物理地址,这样消费者在消费的时候先从consumequeue中根据偏移量定位到具体的commitlog物理文件,然后根据一定的规则(offset和文件大小取模)在commitlog中快速定位。

29、 Master和Slave之间是怎么同步数据的呢?

而消息在master和slave之间的同步是根据raft协议来进行的:

  1. 在broker收到消息后,会被标记为uncommitted状态
  2. 然后会把消息发送给所有的slave
  3. slave在收到消息之后返回ack响应给master
  4. master在收到超过半数的ack之后,把消息标记为committed
  5. 发送committed消息给所有slave,slave也修改状态为committed

30、 你知道RocketMQ为什么速度快吗?

是因为使用了顺序存储、Page Cache和异步刷盘。

  1. 我们在写入commitlog的时候是顺序写入的,这样比随机写入的性能就会提高很多
  2. 写入commitlog的时候并不是直接写入磁盘,而是先写入操作系统的PageCache
  3. 最后由操作系统异步将缓存中的数据刷到磁盘

31、 什么是事务、半事务消息?怎么实现的?

事务消息就是MQ提供的类似XA的分布式事务能力,通过事务消息可以达到分布式事务的最终一致性。

半事务消息就是MQ收到了生产者的消息,但是没有收到二次确认,不能投递的消息。

实现原理如下:

  1. 生产者先发送一条半事务消息到MQ
  2. MQ收到消息后返回ack确认
  3. 生产者开始执行本地事务
  4. 如果事务执行成功发送commit到MQ,失败发送rollback
  5. 如果MQ长时间未收到生产者的二次确认commit或者rollback,MQ对生产者发起消息回查
  6. 生产者查询事务执行最终状态
  7. 根据查询事务状态再次提交二次确认

最终,如果MQ收到二次确认commit,就可以把消息投递给消费者,反之如果是rollback,消息会保存下来并且在3天后被删除。

32、 为什么要用RocketMq?

总得来说,RocketMq具有以下几个优势:

吞吐量高:单机吞吐量可达十万级
可用性高:分布式架构
消息可靠性高:经过参数优化配置,消息可以做到0丢失
功能支持完善:MQ功能较为完善,还是分布式的,扩展性好
支持10亿级别的消息堆积:不会因为堆积导致性能下降
源码是java:方便我们查看源码了解它的每个环节的实现逻辑,并针对不同的业务场景进行扩展
可靠性高:天生为金融互联网领域而生,对于要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
稳定性高:RoketMQ在上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验

33、 RocketMq的部署架构了解吗?

这个是rocketMq的集群架构图,里面包含了四个主要部分:NameServer集群,Producer集群,Cosumer集群以及Broker集群

NameServer 担任路由消息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表分别进行发送消息和消费消息。nameServer由多个无状态的节点构成,节点之间无任何信息同步

broker会定期向NameServer以发送心跳包的方式,轮询向所有NameServer注册以下元数据信息

1)broker的基本信息(ip port等)

2)主题topic的地址信息

3)broker集群信息

4)存活的broker信息

5)filter 过滤器

也就是说,每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息

Producer负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要

Broker,消息中转角色,负责存储消息、转发消息。在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备

Consumer负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费

34、 它有哪几种部署类型?分别有什么特点?

RocketMQ有4种部署类型

1)单Master

单机模式, 即只有一个Broker, 如果Broker宕机了, 会导致RocketMQ服务不可用, 不推荐使用

2)多Master模式

组成一个集群, 集群每个节点都是Master节点, 配置简单, 性能也是最高, 某节点宕机重启不会影响RocketMQ服务

缺点:如果某个节点宕机了, 会导致该节点存在未被消费的消息在节点恢复之前不能被消费

3)多Master多Slave模式,异步复制

每个Master配置一个Slave, 多对Master-Slave, Master与Slave消息采用异步复制方式, 主从消息一致只会有毫秒级的延迟

优点是弥补了多Master模式(无slave)下节点宕机后在恢复前不可订阅的问题。在Master宕机后, 消费者还可以从Slave节点进行消费。采用异步模式复制,提升了一定的吞吐量。总结一句就是,采用多Master多Slave模式,异步复制模式进行部署,系统将会有较低的延迟和较高的吞吐量

缺点就是如果Master宕机, 磁盘损坏的情况下, 如果没有及时将消息复制到Slave, 会导致有少量消息丢失

4)多Master多Slave模式,同步双写

与多Master多Slave模式,异步复制方式基本一致,唯一不同的是消息复制采用同步方式,只有master和slave都写成功以后,才会向客户端返回成功

优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

缺点就是会降低消息写入的效率,并影响系统的吞吐量

实际部署中,一般会根据业务场景的所需要的性能和消息可靠性等方面来选择后两种

35、 你自己部署过RocketMq吗?简单说一下你当时部署的过程

由于我们项目中主要使用rocketMq做链路跟踪功能,因此需要比较高的性能,并且偶尔丢失几条消息也关系不大,所以我们就选择多Master多Slave模式,异步复制方式进行部署

部署过程简单说一下:
我部署的是双master和双slave模式集群,并部署了两个nameserver节点
1)服务器分配
分配是两台服务器,A和B,其中A服务器部署nameserv1,master1,slave2;B服务器部署nameserv2,master2和slave1节点

2)broker的配置
分别配置rocketmq安装目录下四个配置文件:

master1:/conf/2m-2s-async/broker-a.properties
slave2:/conf/2m-2s-async/broker-b-s.properties
master2:/conf/2m-2s-async/broker-b.properties
slave1:/conf/2m-2s-async/broker-a-s.properties

总的思路是
a.master节点的brokerId为0,slave节点的brokerId为1(大于0即可);
b.同一组broker的broker-Name相同,如master1和slave1都为broker-a;
c.每个broker节点配置相同的NameServer;
d.复制方式配置:master节点配置为ASYNC-MASTER,slave节点配置为SLAVE即可;
e.刷盘方式分为同步刷盘和异步刷盘,为了保证性能而不去考虑少量消息的丢失,因此同意配置为异步刷盘

3)启动集群

a 检查修改参数

启动前分别检查修改runbroker.sh和runserver.sh两个文件中的JVM参数,默认的JAVA_OPT参数的值比较大,若直接启动可能会失败,需要根据实际情况重新配置

b 分别启动两个namerser节点

nohup sh bin/mqnamesrv > /dev/null 2>&1 &

查看日志

tail -f ~/logs/rocketmqlogs/namesrv.log

c 分别启动4个broker节点

maste1

nohup sh bin/mqbroker -c
/usr/local/rocketmq/conf/2m-2s-async/broker-a.properties &

slave1

nohup sh bin/mqbroker -c
/usr/local/rocketmq/conf/2m-2s-async/broker-a-s.properties &

maste2

nohup sh bin/mqbroker -c
/usr/local/rocketmq/conf/2m-2s-async/broker-b.properties &

slave2

nohup sh bin/mqbroker -c
/usr/local/rocketmq/conf/2m-2s-async/broker-b-s.properties &

查看日志:

tail -f ~/logs/rocketmqlogs/broker.log

总结:集群环境部署,主要就是以上三个步骤,需要注意的是过程中broker配置文件的配置正确性,还需要注意一下启动前对jvm参数的检查

36、 rocketmq如何保证高可用性?

1)集群化部署NameServer。Broker集群会将所有的broker基本信息、topic信息以及两者之间的映射关系,轮询存储在每个NameServer中(也就是说每个NameServer存储的信息完全一样)。因此,NameServer集群化,不会因为其中的一两台服务器挂掉,而影响整个架构的消息发送与接收;

2)集群化部署多broker。producer发送消息到broker的master,若当前的master挂掉,则会自动切换到其他的master

cosumer默认会访问broker的master节点获取消息,那么master节点挂了之后,该怎么办呢?它就会自动切换到同一个broker组的slave节点进行消费

那么你肯定会想到会有这样一个问题:consumer要是直接消费slave节点,那master在宕机前没有来得及把消息同步到slave节点,那这个时候,不就会出现消费者不就取不到消息的情况了?

这样,就引出了下一个措施,来保证消息的高可用性

3)设置同步复制

前面已经提到,消息发送到broker的master节点上,master需要将消息复制到slave节点上,rocketmq提供两种复制方式:同步复制和异步复制

异步复制,就是消息发送到master节点,只要master写成功,就直接向客户端返回成功,后续再异步写入slave节点

同步复制,就是等master和slave都成功写入内存之后,才会向客户端返回成功

那么,要保证高可用性,就需要将复制方式配置成同步复制,这样即使master节点挂了,slave上也有当前master的所有备份数据,那么不仅保证消费者消费到的消息是完整的,并且当master节点恢复之后,也容易恢复消息数据

在master的配置文件中直接配置brokerRole:SYNC_MASTER即可

37、 rocketmq的工作流程是怎样的?

RocketMq的工作流程如下:

1)首先启动NameServer。NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来

2)启动Broker。启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系

3)创建Topic。创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic

4)Producer发送消息。启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送

5)Consumer消费消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费

38、 RocketMq使用哪种方式消费消息,pull还是push?

RocketMq提供两种方式:pull和push进行消息的消费

而RocketMq的push方式,本质上也是采用pull的方式进行实现的。也就是说这两种方式本质上都是采用consumer轮询从broker拉取消息的

push方式里,consumer把轮询过程封装了一层,并注册了MessageListener监听器。当轮询取到消息后,便唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉好像消息是被推送过来的

其实想想,消息统一都发到了broker,而broker又不会主动去push消息,那么消息肯定都是需要消费者主动去拉的喽~

39、 RocketMq如何负载均衡?

1)producer发送消息的负载均衡:默认会轮询向Topic的所有queue发送消息,以达到消息平均落到不同的queue上;而由于queue可以落在不同的broker上,就可以发到不同broker上(当然也可以指定发送到某个特定的queue上)

2)consumer订阅消息的负载均衡:假设有5个队列,两个消费者,则第一个消费者消费3个队列,第二个则消费2个队列,以达到平均消费的效果。而需要注意的是,当consumer的数量大于队列的数量的话,根据rocketMq的机制,多出来的队列不会去消费数据,因此建议consumer的数量小于或者等于queue的数量,避免不必要的浪费

40、 RocketMq的存储机制了解吗?

RocketMq采用文件系统进行消息的存储,相对于ActiveMq采用关系型数据库进行存储的方式就更直接,性能更高了

RocketMq与Kafka在写消息与发送消息上,继续沿用了Kafka的这两个方面:顺序写和零拷贝

1)顺序写
我们知道,操作系统每次从磁盘读写数据的时候,都需要找到数据在磁盘上的地址,再进行读写。而如果是机械硬盘,寻址需要的时间往往会比较长
而一般来说,如果把数据存储在内存上面,少了寻址的过程,性能会好很多;但Kafka 的数据存储在磁盘上面,依然性能很好,这是为什么呢?
这是因为,Kafka采用的是顺序写,直接追加数据到末尾。实际上,磁盘顺序写的性能极高,在磁盘个数一定,转数一定的情况下,基本和内存速度一致
因此,磁盘的顺序写这一机制,极大地保证了Kafka本身的性能
2)零拷贝
比如:读取文件,再用socket发送出去这一过程

buffer = File.read
Socket.send(buffer)

传统方式实现:
先读取、再发送,实际会经过以下四次复制
1、将磁盘文件,读取到操作系统内核缓冲区Read Buffer
2、将内核缓冲区的数据,复制到应用程序缓冲区Application Buffer
3、将应用程序缓冲区Application Buffer中的数据,复制到socket网络发送缓冲区
4、将Socket buffer的数据,复制到网卡,由网卡进行网络传输

传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的
重新思考传统IO方式,会注意到在读取磁盘文件后,不需要做其他处理,直接用网络发送出去的这种场景下,第二次和第三次数据的复制过程,不仅没有任何帮助,反而带来了巨大的开销。那么这里使用了零拷贝,也就是说,直接由内核缓冲区Read Buffer将数据复制到网卡,省去第二步和第三步的复制。
图片
那么采用零拷贝的方式发送消息,必定会大大减少读取的开销,使得RocketMq读取消息的性能有一个质的提升

此外,还需要再提一点,零拷贝技术采用了MappedByteBuffer内存映射技术,采用这种技术有一些限制,其中有一条就是传输的文件不能超过2G,这也就是为什么RocketMq的存储消息的文件CommitLog的大小规定为1G的原因

小结:RocketMq采用文件系统存储消息,并采用顺序写写入消息,使用零拷贝发送消息,极大得保证了RocketMq的性能

41、 RocketMq的存储结构是怎样的?

如图所示,消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G

CommitLog-存储所有的消息元数据,包括Topic、QueueId以及message

CosumerQueue-消费逻辑队列:存储消息在CommitLog的offset

IndexFile-索引文件:存储消息的key和时间戳等信息,使得RocketMq可以采用key和时间区间来查询消息

也就是说,rocketMq将消息均存储在CommitLog中,并分别提供了CosumerQueue和IndexFile两个索引,来快速检索消息

42、 RocketMq如何进行消息的去重?

我们知道,只要通过网络交换数据,就无法避免因为网络不可靠而造成的消息重复这个问题。比如说RocketMq中,当consumer消费完消息后,因为网络问题未及时发送ack到broker,broker就不会删掉当前已经消费过的消息,那么,该消息将会被重复投递给消费者去消费

虽然rocketMq保证了同一个消费组只能消费一次,但会被不同的消费组重复消费,因此这种重复消费的情况不可避免

RocketMq本身并不保证消息不重复,这样肯定会因为每次的判断,导致性能打折扣,所以它将去重操作直接放在了消费端:

1)消费端处理消息的业务逻辑保持幂等性。那么不管来多少条重复消息,可以实现处理的结果都一样

2)还可以建立一张日志表,使用消息主键作为表的主键,在处理消息前,先insert表,再做消息处理。这样可以避免消息重复消费

43、 为什么使用MQ?优点?常用场景?

  • 简答

  • 异步处理 - 相比于传统的串行、并行方式,提高了系统吞吐量。

  • 应用解耦 - 系统间通过消息通信,不用关心其他系统的处理。

  • 流量削锋 - 可以通过消息队列长度控制请求量;可以缓解短时间内的高并发请求。

  • 日志处理 - 解决大量日志传输。

  • 消息通讯 - 消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等。

  • 详答

主要是: 解耦异步削峰

  • 解耦
    A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 C 系统现在不需要了呢?A 系统负责人几乎崩溃…A 系统跟其它各种乱七八糟的系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。如果使用 MQ,A 系统产生一条数据,发送到 MQ 里面去,哪个系统需要数据自己去 MQ 里面消费。如果新系统需要数据,直接从 MQ 里消费即可;如果某个系统不需要这条数据了,就取消对 MQ 消息的消费即可。这样下来,A 系统压根儿不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况。

就是一个系统或者一个模块,调用了多个系统或者模块,互相之间的调用很复杂,维护起来很麻烦。但是其实这个调用是不需要直接同步调用接口的,如果用 MQ 给它异步化解耦。

  • 异步
    A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s,用户感觉搞个什么东西,慢死了慢死了。用户通过浏览器发起请求。如果使用 MQ,那么 A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。
  • 削峰
    减少高峰时期对服务器压力。

44、 使用MQ有什么缺点?

  • 系统可用性降低
    系统引入的外部依赖越多,越容易挂掉。本来你就是 A 系统调用 B、C、D三个系统的接口就好了,人 A、B、C、D四个系统好好的,没啥问题,你偏加个MQ进来,万一MQ挂了咋整,MQ一挂,整套系统崩溃的,你不就完了?如何保证消息队列的高可用,可以点击这里查看。
  • 系统复杂度提高
    硬生生加个MQ进来,你怎么保证消息没有重复消费?怎么处理消息丢失的情况?怎么保证消息传递的顺序性?头大头大,问题一大堆,痛苦不已。
  • 一致性问题
    A系统处理完了直接返回成功了,人都以为你这个请求就成功了;但是问题是,要是 B、C、D 三个系统那里,B、D两个系统写库成功了,结果C系统写库失败了,咋整?你这数据就不一致了。

所以消息队列实际是一种非常复杂的架构,引入MQ有很多好处,但是也得针对它带来的坏处做各种额外的技术方案和架构来规避掉,做好之后,你会发现,系统复杂度提升了一个数量级,也许是复杂了10倍。但是关键时刻,还是得用的。

45、 Kafka、ActiveMQ、RabbitMQ、RocketMQ 的区别?

ActiveMQRabbitMQRocketMQKafkaZeroMQ
单机吞吐量比RabbitMQ低2.6w/s(消息做持久化)11.6w/s17.3w/s29w/s
开发语言JavaErlangJavaScala/JavaC
主要维护者ApacheMozilla/SpringAlibabaApacheiMatix,创始人已去世
成熟度成熟成熟开源版本不够成熟比较成熟只有C、PHP等版本成熟
订阅形式点对点(p2p)、广播(发布-订阅)提供了4种:direct, topic ,Headers和fanout。fanout就是广播模式基于topic/messageTag以及按照消息类型、属性进行正则匹配的发布订阅模式基于topic以及按照topic进行正则匹配的发布订阅模式点对点(p2p)
持久化支持少量堆积支持少量堆积支持大量堆积支持大量堆积不支持
顺序消息不支持不支持支持支持不支持
性能稳定性一般较差很好
集群方式支持简单集群模式,比如’主-备’,对高级集群模式支持不好。支持简单集群,'复制’模式,对高级集群模式支持不好。常用 多对’Master-Slave’ 模式,开源版本需手动切换Slave变成Master天然的‘Leader-Slave’无状态集群,每台服务器既是Master也是Slave不支持
管理界面一般较好一般

综上,各种对比之后,有如下建议:

一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache ,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

46、 消息队列如何选型?

  1. 如果消息队列不是将要构建系统的重点,对消息队列功能和性能没有很高的要求,只需要一个快速上手易于维护的消息队列,建议使用RabbitMQ。
  2. 如果系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,需要低延迟和高稳定性,建议使用RocketMQ。
  3. 如果需要处理海量的消息,像收集日志、监控信息或是埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那Kafka是最适合的消息队列。

47、 MQ 有哪些常见问题?如何解决这些问题?

MQ 的常见问题有:

  • 消息的顺序问题

消息有序指的是可以按照消息的发送顺序来消费。

假如生产者产生了 2 条消息:M1、M2,假定 M1 发送到 S1,M2 发送到 S2,如果要保证 M1 先于 M2 被消费,怎么做?

解决方案:

  • 保证生产者 - MQServer - 消费者是一对一对一的关系

缺陷:

  • 并行度就会成为消息系统的瓶颈(吞吐量不够)
  • 更多的异常处理,比如:只要消费端出现问题,就会导致整个处理流程阻塞,我们不得不花费更多的精力来解决阻塞的问题。 (2)通过合理的设计或者将问题分解来规避。
  • 不关注乱序的应用实际大量存在
  • 队列无序并不意味着消息无序 所以从业务层面来保证消息的顺序而不仅仅是依赖于消息系统,是一种更合理的方式。
  • 消息的重复问题

造成消息重复的根本原因是:网络不可达。

所以解决这个问题的办法就是绕过这个问题。那么问题就变成了:如果消费端收到两条一样的消息,应该怎样处理?

消费端处理消息的业务逻辑保持幂等性。只要保持幂等性,不管来多少条重复消息,最后处理的结果都一样。保证每条消息都有唯一编号且保证消息处理成功与去重表的日志同时出现。利用一张日志表来记录已经处理成功的消息的 ID,如果新到的消息 ID 已经在日志表中,那么就不再处理这条消息。

48、 依托消息中间件如何实现异步?

引入了MQ后,用来的依赖关系转移了,从系统之间的依赖,变成系统都依赖MQ

A调用B,只需要向MQ发送一条消息,A就认为自己的工作完成了。不用像之前调用B一样等着B处理一堆的业务逻辑和数据库操作。

B会从MQ中读取A发送的特定消息,完成自己该做的事情。

这样就实现了A异步调用B

49、 RocketMQ消息是push还是pull?

RocketMQ没有真正意义的push,都是pull,虽然有push类,但实际底层实现采用的是长轮询机制,即拉取方式
broker端属性 longPollingEnable 标记是否开启长轮询。默认开启

源码如下:

// {@link org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage()}
// 看到没,这是一只披着羊皮的狼,名字叫PushConsumerImpl,实际干的确是pull的活。
// 拉取消息,结果放到pullCallback里
this.pullAPIWrapper.pullKernelImpl(pullCallback);

50、 为什么要主动拉取消息而不使用事件监听方式?

事件驱动方式是建立好长连接,由事件(发送数据)的方式来实时推送。

如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在 Consumer 端堆积过多,同时又不能被其他 Consumer 消费的情况。而 pull 的方式可以根据当前自身情况来 pull,不会造成过多的压力而造成瓶颈。所以采取了 pull 的方式。

相关文章

目录