2023 RocketMQ常见面试题50道

x33g5p2x  于2023-01-13 发布在 其他  
字(10.2k)|赞(0)|评价(0)|浏览(1662)

1、 当消费负载均衡consumer和queue不对等的时候会发生什么?

Consumer 和 queue 会优先平均分配,如果 Consumer 少于 queue 的个数,则会存在部分 Consumer 消费多个 queue 的情况,如果 Consumer 等于 queue 的个数,那就是一个 Consumer 消费一个 queue,如果 Consumer 个数大于 queue 的个数,那么会有部分 Consumer 空余出来,白白的浪费了。

2、 消息重复消费如何解决?

影响消息正常发送和消费的重要原因是网络的不确定性。

  • 引起重复消费的原因

  • ACK
    正常情况下在consumer真正消费完消息后应该发送ack,通知broker该消息已正常消费,从queue中剔除

当ack因为网络原因无法发送到broker,broker会认为词条消息没有被消费,此后会开启消息重投机制把消息再次投递到consumer

  • 消费模式
    在CLUSTERING模式下,消息在broker中会保证相同group的consumer消费一次,但是针对不同group的consumer会推送多次

  • 解决方案

  • 数据库表

处理消息前,使用消息主键在表中带有约束的字段中insert

  • Map

单机时可以使用map ConcurrentHashMap -> putIfAbsent guava cache

  • Redis

分布式锁搞起来。

3、 如何让 RocketMQ 保证消息的顺序消费?

首先多个 queue 只能保证单个 queue 里的顺序,queue 是典型的 FIFO,天然顺序。多个 queue 同时消费是无法绝对保证消息的有序性的。所以总结如下:

同一 topic,同一个 QUEUE,发消息的时候一个线程去发送消息,消费的时候 一个线程去消费一个 queue 里的消息。

4、 怎么保证消息发到同一个queue?

Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断 i % 2 == 0 ,那就都放到queue1里,否则放到queue2里。

for (int i = 0; i < 5; i++) {
    Message message = new Message("orderTopic", ("hello!" + i).getBytes());
    producer.send(
        // 要发的那条消息
        message,
        // queue 选择器 ,向 topic中的哪个queue去写消息
        new MessageQueueSelector() {
            // 手动 选择一个queue
            @Override
            public MessageQueue select(
                // 当前topic 里面包含的所有queue
                List<MessageQueue> mqs,
                // 具体要发的那条消息
                Message msg,
                // 对应到 send() 里的 args,也就是2000前面的那个0
                Object arg) {
                // 向固定的一个queue里写消息,比如这里就是向第一个queue里写消息
                if (Integer.parseInt(arg.toString()) % 2 == 0) {
                    return mqs.get(0);
                } else {
                    return mqs.get(1);
                }
            }
        },
        // 自定义参数:0
        // 2000代表2000毫秒超时时间
        i, 2000);
}

5、 RocketMQ如何保证消息不丢失?

首先在如下三个部分都可能会出现丢失消息的情况:

  • Producer端
  • Broker端
  • Consumer端

6、 Producer端如何保证消息不丢失

  • 采取send()同步发消息,发送结果是同步感知的。
  • 发送失败后可以重试,设置重试次数。默认3次。
    producer.setRetryTimesWhenSendFailed(10);
  • 集群部署,比如发送失败了的原因可能是当前Broker宕机了,重试的时候会发送到其他Broker上。

7、 Broker端如何保证消息不丢失

  • 修改刷盘策略为同步刷盘。默认情况下是异步刷盘的。
    flushDiskType = SYNC_FLUSH
  • 集群部署,主从模式,高可用。

8、 Consumer端如何保证消息不丢失

  • 完全消费正常后在进行手动ack确认。

9、 RocketMQ的消息堆积如何处理?

首先要找到是什么原因导致的消息堆积,是 Producer 太多了,Consumer 太少了导致的还是说其他情况,总之先定位问题。

然后看下消息消费速度是否正常,正常的话,可以通过上线更多 Consumer 临时解决消息堆积问题

10、 如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?

  • 准备一个临时的 topic
  • queue 的数量是堆积的几倍
  • queue 分布到多 Broker 中
  • 上线一台 Consumer 做消息的搬运工,把原来 Topic 中的消息挪到新的 Topic里,不做业务逻辑处理,只是挪过去
  • 上线 N 台 Consumer 同时消费临时 Topic 中的数据
  • 改 bug
  • 恢复原来的 Consumer,继续消费之前的 Topic

11、 堆积消息会超时删除吗?

不会;RocketMQ 中的消息只会在 commitLog 被删除的时候才会消失。也就是说未被消费的消息不会存在超时删除这情况。

12、 堆积的消息会不会进死信队列?

不会,消息在消费失败后会进入重试队列(%RETRY%+ConsumerGroup),18次才会进入死信队列(%DLQ%+ConsumerGroup)。

源码如下:

public class MessageStoreConfig {
    // 每隔如下时间会进行重试,到最后一次时间重试失败的话就进入死信队列了。
 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
}

13、 RocketMQ 在分布式事务支持这块机制的底层原理?

分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性

RocketMQ 4.3+ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致

RocketMQ实现方式:

  • Half Message
    预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
  • 检查事务状态
    Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
  • 超时
    如果超过回查次数,默认回滚消息。

也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。

14、 RocketMQ 是如何保证数据的高容错性的?

  • 在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker
  • 如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用
  • 如果上次失败的Broker可用那么还是会选择该Broker的队列
  • 如果上述情况失败,则随机选择一个进行发送
  • 在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间

其实就是send消息的时候queue的选择。源码在如下:

org.apache.rocketmq.client.latency.MQFaultStrategy#selectOneMessageQueue()

15、 RocketMQ如何分布式存储海量消息的?

RocketMQ 进程一般称为 Broker,集群部署的各个 Broker收到不同的消息,然后存储在自己本地的磁盘文件中。

16、 任何一台 Broker 突然宕机了怎么办?还能使用吗?消息会不会丢?

RocketMQ的解决思路是Broker主从架构以及多副本策略。

Master收到消息后会同步给Slave,这样一条消息就不止一份了,Master宕机了还有slave中的消息可用,保证了MQ的可靠性和高可用新。

17、 怎么知道有哪些 Broker ?如何知道要连那个Broker?

有个NameServer的概念,是独立部署在几台机器上的,然后所有的Broker都会把自己注册到NameServer上去,NameServer就知道集群里有哪些Broker了!

发送消息到 Broker,会找 NameServer 去获取路由信息
系统要从 Broker 获取消息,也会找 NameServer 获取路由信息,去找到对应的 Broker 获取消息。

18、 NameServer到底可以部署几台机器?为什么要集群化部署?

部署多台,保证高可用性。

集群化部署是为了高可用性,NameServer 是集群里非常关键的一个角色,如果部署一台 NameServer,宕机会导致 RocketMQ 集群出现故障,所以 NameServer 一定会多机器部署,实现一个集群,起到高可用的效果。

19、 系统如何从NameServer获取Broker信息?

系统主动去NameServer上拉取Broker信息及其他相关信息。

20、 如果Broker宕了,NameServer是怎么感知到的?

Broker会定时(30s)向NameServer发送心跳
然后 NameServer会定时(10s)运行一个任务,去检查一下各个Broker的最近一次心跳时间,如果某个Broker超过120s都没发送心跳了,那么就认为这个Broker已经挂掉了。

21、 Broker挂了,系统是怎么感知到的?

主要是通过拉取NameServer上Broker的信息。
但是,因为Broker心跳、NameServer定时任务、生产者和消费者拉取Broker信息,这些操作都是周期性的,所以不会实时感知,所以存在发送消息和消费消息失败的情况,现在 我们先知道,对于生产者而言,他是有 一套容错机制的。

22、 Master Broker 是如何将消息同步给 Slave Broker 的?

RocketMQ 自身的 Master-Slave 模式采取的是 Pull 模式拉取消息。

23、 消费消息时是从Master获取还是Slave获取?

可能从Master Broker获取消息,也有可能从Slave Broker获取消息

  1. 消费者的系统在获取消息的时候会先发送请求到Master Broker上去,请求获取一批消息,此时Master Broker是会返回一批消息给消费者系统的
  2. Master Broker在返回消息给消费者系统的时候,会根据当时Master Broker的 负载情况和Slave Broker的 同步情况,向消费者系统建议下一次拉取消息的时候是从Master Broker拉取还是从Slave Broker拉取。

24、 如果 Slave Broker 挂掉了,会对整个系统有影响吗?

有一点影响,但是影响不太大,因为消息写入全部是发送到Master Broker的,获取消息也可以Master获取,少了Slave Broker,会导致所有读写压力都集中在Master Broker

25、 Master Broker 突然挂了,这样会怎么样?

RocketMQ 4.5 版本之前,用 Slave Broker 同步数据,尽量保证数据不丢失,但是一旦 Master 故障了,Slave 是没法自动切换成 Master 的。
所以在这种情况下,如果 Master Broker 宕机了,这时就得手动做一些运维操作,把 Slave Broker 重新修改一些配置,重启机器给调整为Master Broker,这是有点麻烦的,而且会导致中间一段时间不可用。

RocketMQ 4.5之后支持了一种叫做 Dledger 机制,基于 Raft 协议实现的一个机制。
我们可以让一个 Master Broker 对应多个 Slave Broker, 一旦 Master Broker 宕机了,在多个 Slave 中通过 Dledger 技术 将一个 Slave Broker 选为新的 Master Broker 对外提供服务。
在生产环境中可以是用 Dledger 机制实现自动故障切换,只要10秒或者几十秒的时间就可以完成

26、 为什么使用rocketMQ

性能: TPS 10000没问题

顺序消费: 可以保证一个队列里面的消息顺序消费,比如同一个订单的消息可以放到同一个队列这样就达到了顺序消费,如果想 保证全局顺序,设置一个队列

事务消息: 添加事务表,实现TransactionListener,在本地事务提交的时候往事务表插入一条数据,mq回查消息,如果存在就commit,不存在就rollBack,回查次数自己设置

思想:利用两阶段提交+补偿机制

27、 消息队列有哪些消息模型

队列模型: 一条消息被一个消费组下面的一个消费者消费 对应集群消费

发布/订阅模型:一条消息被消费组下面的所有消费者消费 对应广播消费

28、 如何处理消息的重复问题

业务幂等: 保证业务消费一条和消费多条是幂等的

消息去重:为每条消息创建一个唯一的key,不能重复消费,比如设置唯一索引,将消息插入数据库做判断

29、 怎么处理消息积压

消费者扩容:如果队列的个数大于消费者的个数,可以对消费者进行扩容,提高消费能力

迁移消息到临时topic:如果队列的个数小于消费者的个数,增加消费者也不会提高消费能力,新建一个临时的topic,用几个消费者直接将消息丢到临时的topic,然后创建几个消费者去消费临时的topic,这样也是间接的加大消费能力

30、 怎么保证消息顺序

部分消息顺序:将消息都发送到同一个队列

全局消息顺序:配置topic为1个队列

31、 如何实现消息过滤

tag过滤

sql表达式过滤

filter server自定义函数过滤

32、 RocketMQ怎么实现延时消息的

发送消息的时候设置延迟级别,broker收到延时消息的时候会先将消息发送到SCHEDULE_JOB_XXX的相应时间段的队列中,然后通过一个定时任务轮询这些队列,如果达到时间了就将消息发送到目标topic的队列,然后消费者就可以正常消费消息

33、 事务消息怎么实现?

  • 1、Producer 向 broker 发送半消息
  • 2、Producer 端收到响应,消息发送成功,此时消息是半消息,标记为 “不可投递” 状态,Consumer 消费不了。
  • 3、Producer 端执行本地事务。
  • 4、正常情况本地事务执行完成,Producer 向 Broker 发送 Commit/Rollback,如果是 Commit,Broker 端将半消息标记为正常消息,Consumer 可以消费,如果是 Rollback,Broker 丢弃此消息。
  • 5、异常情况,Broker 端迟迟等不到二次确认。在一定时间后,会查询所有的半消息,然后到 Producer 端查询半消息的执行情况。
  • 6、Producer 端查询本地事务的状态
  • 7、根据事务的状态提交 commit/rollback 到 broker 端。(5,6,7 是消息回查)
  • 8、消费者段消费到消息之后,执行本地事务,执行本地事务。

34、 死信队列了解吗?

消息消费失败之后,会自动进行消息重试,如果达到了重试的次数仍然消费失败,会将该消息发送到死信队列,死信队列的消息不会被消费者正常消费,有效期为3天,3天之后自动删除,一个死信队列对应一个group id,控制台支持对死信消息的查询、重发、导出

35、 如何保证RocketMQ的高可用

首先broker是集群部署,每一个master下面挂一个slave

读的高可用:如果master挂了,消费者还可以从slave读取消息

写的高可用:由broker集群保证,单个节点出现问题不影响发送消息到broker,如果master挂了,可以修改slave的配置文件为master,然后启动承载写的功能

36、 RocketMQ为什么不采用zookeeper做注册中心?

基于可用性来考虑,zookeeper满足的是CP

基于性能来考虑,nameserver本身的实现是很轻量级,可以通过增加机器的方式水平扩展,提升集群的抗压能力

消息发送应该弱依赖于nameserver,当生产者第一次发送消息,从nameserver获取到broker地址然后缓存到本地,所以nameserver集群挂了之后也不会影响生产者发送消息

37、 Broker是怎么保存数据的呢?

commitlog文件: 消息的主体内容

ConsumeQueue文件:基于topic的commitLog索引文件

IndexFile:提供根据消息key或者时间区间查询消息

利用的操作系统高效读写的方式:PageCache 、 顺序读写、零拷贝

38、 消息刷盘怎么实现的?

同步刷盘:消息到达Broker内存之后将消息刷盘到commitLog中并返回生产者发送成功

异步刷盘:消息到达Broker内存之后返回生产者发送成功,并唤醒后台线程将数据刷盘到commitLog日志文件中,只是唤醒,不确定线程执行的时机

刷盘的最终实现是调用NIO的MappedByteBuffer.force() 将数据刷新到磁盘

39、 RocketMQ的负载均衡是如何实现的?

生产者端的负载均衡:索引递增取模 ,如果 x1m0n1x 为true(默认为false),将会规避上次发送失败的broker

消费者端的负载均衡:

40、 RocketMQ消息长轮询?

Consumer拉取消息,如果队列里面没有消息不会立即返回,而是维持一个PullRequest,另外有一个线程会不断的检查队列是否有消息,如果有则返回,如果到了阻塞的时间还没有消息则返回

41、 RocketMQ 是什么?

RocketMQ 是阿里巴巴在 2012 年开源的分布式消息中间件,目前已经捐赠给 Apache 软件基金会,并成为 Apache 的顶级项目。作为经历过多次阿里巴巴双十一这种超级工程的洗礼并有稳定出色表现的国产中间件,具有高性能、低延时和高可靠等特性。主要用来提升性能、系统解耦、流量肖峰等。

42、 RocketMQ 有和特点?

1)灵活可扩展性

RocketMQ 天然支持集群,其核心四组件(Name Server、Broker、Producer、Consumer)每一个都可以在没有单点故障的情况下进行水平扩展。

2)海量消息堆积能力

采用零拷贝原理实现超大的消息的堆积能力,据说单机已可以支持亿级消息堆积,而且在堆积了这么多消息后依然保持写入低延迟。

3)支持顺序消息

可以保证消息消费者按照消息发送的顺序对消息进行消费。顺序消息分为全局有序和局部有序,一般推荐使用局部有序,即生产者通过将某一类消息按顺序发送至同一个队列来实现。

4)多种消息过滤方式

消息过滤分为在服务器端过滤和在消费端过滤。服务器端过滤时可以按照消息消费者的要求做过滤,优点是减少不必要消息传输,缺点是增加了消息服务器的负担,实现相对复杂。消费端过滤则完全由具体应用自定义实现,这种方式更加灵活,缺点是很多无用的消息会传输给消息消费者。

5)支持事务消息

RocketMQ 除了支持普通消息,顺序消息之外还支持事务消息,这个特性对于分布式事务来说提供了又一种解决思路。

6)回溯消费

回溯消费是指消费者已经消费成功的消息,由于业务上需求需要重新消费,RocketMQ 支持按照时间回溯消费,时间维度精确到毫秒,可以向前回溯,也可以向后回溯。

43、 几种常见 MQ 的比较?

Kafka、ActiveMQ、RabbitMQ 以及 RocketMQ 各自的优缺点:

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级万级十万级十万级
Topic数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性毫秒级微秒级,这是 RabbitMQ 的一大特点,延迟最低毫秒级毫秒级
可用性高,基于主从架构实现高可用非常高非常高
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同RocketMQ

一般的业务系统要引入 MQ,早起大家都用 ActiveMQ,但是现在已经使用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;

后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;

不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前 RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐用 RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄。

所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。

如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。

44、 RocketMQ 的角色构成?

  • 生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  • 消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  • 消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  • 名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

45、 RocketMQ 执行流程?

1)启动 Namesrv 后开始监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

2)Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包。

3)收发消息前,先创建 Topic。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上,也可以在发送消息时自动创建 Topic。

4)Producer 向该 Topic 发送消息。

5)Consumer 消费该 Topic 的消息。

46、 RocketMQ 的消息结构?

public class Message implements Serializable {
    // 表示消息要到的发送主题(必填)
    private String topic;
    // 消息的标记,完全由应用设置,RocketMQ不做任何处理
    private int flag;
    // 消息属性,主要存储一些消息的元数据信息
    private Map<String, String> properties;
    // 消息的内容,这是一个字节数组,序列化方式由应用决定
    private byte[] body;
    // 事务id,仅在事务消息中使用到 
    private String transactionId;
}

47、 RocketMQ 为何这么快?

是因为使用了顺序存储、Page Cache 和异步刷盘。

1)在写入 commitLog 的时候是顺序写入的,这样比随机写入的性能有巨大提升。

2)写入 commitLog 的时候并不是直接写入磁盘,而是先写入操作系统的 PageCache。最后由操作系统异步将缓存中的数据刷到磁盘。

48、 消费者消费的几种模式?

RocketMQ 消费者有集群消费广播消费两种消费模式。

  • 集群消费

一个 Consumer Group 中的各个 Consumer 实例分摊去消费消息,即一条消息只会投递到一个 Consumer Group 下面的一个实例。

  • 广播消费

消息将对一个 Consumer Group 下的各个 Consumer 实例都投递一遍。即即使这些 Consumer 属于同一个Consumer Group ,消息也会被 Consumer Group 中的每个 Consumer 都消费一次。

49、 消费者获取消息有几种模式?

消费者获取消息有两种模式:推送模拉取模式

  • PushConsumer

即推送模式,消息的能及时被消费。使用非常简单,内部已处理如线程池消费、流控、负载均衡、异常处理等等的各种场景。

但 RocketMQ 没有真正意义的 push,都是 pull,虽然有 push 类,但实际底层实现采用的是长轮询机制,即拉取方式。

  • PullConsumer

即拉取模式,应用主动控制拉取的时机,怎么拉取,如何消费等。主动权更高,但要自己处理各种场景。

50、 如何获取 Topic-Broker 的映射关系?

Producer 和 Consumer 启动时,也都需要指定 Namesrv 的地址,从 Namesrv 集群中选一台建立长连接。

生产者每 30 秒从 Namesrv 获取 Topic 跟 Broker 的映射关系,更新到本地内存中。然后再跟 Topic 涉及的所有 Broker 建立长连接,每隔 30 秒发一次心跳。

相关文章

目录