RocketMQ进击(七)盘一盘RocketMQ的重试机制和生产端重试与消费端重试(异常重试和超时重试)

x33g5p2x  于2021-12-19 转载在 其他  
字(13.2k)|赞(0)|评价(0)|浏览(583)

楔子:翻了帖子两三天,硬是没有找到哪个帖子能证明生产端的消息重试是确实重试了的。大多要么是对概念、源码说明了一下,或者把实现示例贴贴,但基本并没有有效测试证明。想了想,还是自己来捋一捋这 RocketMQ 的消息重试机制。

由于 MQ 经常处于庞大的分布式系统中,考虑到网络波动、服务宕机、程序异常等因素,很可能会出现消息发送或者消费失败的问题。因此,如果没有消息重试,就有可能造成消息丢失,最终影响到系统某些业务或流程。所以,大部分消息中间件都对消息重试提供了很好的支持。RocketMQ 消息重试分为两种:**Producer 发送重试 **和 Consumer 消费重试

1. 生产端重试

**也叫消息重投。**一般由于网络抖动等原因,Producer 向 Broker 发送消息时没有成功,导致最终 Consumer 无法消费消息,此时 RocketMQ 会自动进行消息重试/重投。我们可以手动设置发送失败时的重试次数。默认为 2 次,但加上程序本身的 1 次发送,如果失败,总共会发送 3 次,也就是 N + 1 次。N 为 retryTimesWhenSendFailed。

1.2. 源码分析

验证前,我们先来撸一下源码:

private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    // ...源码省略...
    // 获取当前时间
    long beginTimestampFirst = System.currentTimeMillis();
    long beginTimestampPrev = beginTimestampFirst;
    // 去服务器看下有没有主题消息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if(topicPublishInfo != null && topicPublishInfo.ok()) {
        // ...源码省略...
        // 通过这里可以很明显看出,如果是同步消息,则重试 变量值+1次;如果不是同步发送消息,那么消息重试只有1次
        int timesTotal = communicationMode == CommunicationMode.SYNC?1 + this.defaultMQProducer.getRetryTimesWhenSendFailed():1;
        // 重试累计次数
        int times = 0;
        String[] brokersSent = new String[timesTotal];

        while(true) {
            label129: {
                String info;
                // 如果重试累计次数 小于 总重试次数阀值,则轮询获取服务器主题消息
                if(times < timesTotal) {
                    info = null == mq?null:mq.getBrokerName();
                    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                    if(mqSelected != null) {
                        mq = mqSelected;
                        brokersSent[times] = mqSelected.getBrokerName();

                        long endTimestamp;
                        try {
                            beginTimestampPrev = System.currentTimeMillis();
                            if(times > 0) {
                                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                            }
                            // 消息投送耗时
                            long costTime = beginTimestampPrev - beginTimestampFirst;
                            // 如果 消息投送耗时 小于等于 超时时间,则向 Broker 进行消息重投;否则,超时
                            if(timeout >= costTime) {
                                // 调用sendKernelImpl开始发送消息
                                sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                                // ...源码省略...
                                default:
                                    break label129;
                                }
                            }

                            // 设置超时
                            callTimeout = true;
                        } catch (RemotingException var26) {
                            // ...源码省略...
                            // 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                            break label129;
                        } catch (MQClientException var27) {
                            // ...源码省略...
                            // 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                            break label129;
                        } catch (MQBrokerException var28) {
                            // ...源码省略...
                            // 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                            switch(var28.getResponseCode()) {
                            case 1:
                            case 14:
                            case 16:
                            case 17:
                            case 204:
                            case 205:
                                break label129;
                            default:
                                if(sendResult != null) {
                                    return sendResult;
                                } else {
                                    throw var28;
                                }
                            }
                        } catch (InterruptedException var29) {
                            // 源码省略......
                            throw var29;
                        }
                    }
                }

                if(sendResult != null) {
                    return sendResult;
                }

                // 重试日志
                info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", new Object[]{Integer.valueOf(times), Long.valueOf(System.currentTimeMillis() - beginTimestampFirst), msg.getTopic(), Arrays.toString(brokersSent)});
                info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                // 如果是消息发送/重试/重投超时,则抛出异常。如果还有重试次数,该异常不会再对该消息进行重试
                if(callTimeout) {
                    throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                }

                // ...源码省略...
                // 默认走 MQClientException 异常
                throw mqClientException;
            }
            // 重试次数累加
            ++times;
        }
    } else {
        // 如果没有可用 Topic,且 NamesrvAddr 地址列表不为空,则构建 MQClientException,没有重试
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if(null != nsList && !nsList.isEmpty()) {
            throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
        } else {
            // 如果没有可用 Topic,且 NamesrvAddr 地址列表为空,则构建 MQClientException,没有重试
            throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
        }
    }
}

从 DefaultMQProducer 源码分析可以看出:

生产者重试几次?

  • 同步发送:默认 retryTimesWhenSendFailed 是 2次重试,所以除了正常调用 1 次外,发送消息如果失败了会重试 2 次;超时异常不会重试
  • 异步发送:不会重试(调用总次数等于1)
  • 单向发送:oneway 没有任何保证

什么时候重试?

  • 当使用 RocketMQ 的 send() 方法发送消息时,出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投(见上面源码分析)。需要注意的是消息重试/重投是发生在 RocketMQ 内部,我们所能干预的是重试次数等。
  • 在多条消息发送的 for 循环下的 try catch 可以实现服务降级,防止前一条消息的发送失败阻断后面的消息发送,但是起不到消息重试的作用,原因如上,消息重试/重投是发生在 RocketMQ 内部。

怎么重试?
每次重试都会重新进行负载均衡(会考虑发送失败的因素),使用 selectOneMessageQueue 重新选择 MessageQueue,这样增大发送消息成功的可能性。

隔多久重试?
立即重试,中间没有单独的间隔时间。见源码 真死循环 while(true) 的 sendDefaultImpl 方法,里面有个 label129 标记,只要上一次发送消息后被标记为 label129,就会立马进行下一次消息重投,没有时间间隔。

1.3. 代码示例

配置一个不存在的 nameServer 地址,实现一个设置重试次数 retryTimesWhenSendFailed 为 2,但总共会重试/重投 3 次(因为 N + 1)的消息生产者:

public class RetryMultiMqProducer {

    // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
    private static final String MQ_CONFIG_TAG_RETRY = "PID_MEIWEI_SMS_RETRY_PRODUCER";

    public static void main(String[] args) throws Exception {
        // 创建一个 producer 生产者
        DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-retry");
        // 指定 NameServer 地址列表,多个 nameServer 地址用半角分号隔开。此处应改为实际 NameServer 地址
        // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
        producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
        // 设置重试次数,默认情况下是2次重试
        // 虽然默认2次,但算上程序本身的1次,其实有3次机会,即如果本身的1次发送成功,2次重试机制就不重试了;如果本身的1次发送失败,则再执行这2次重试机会
        producer.setRetryTimesWhenSendFailed(2);
        // 设置超时时长,默认情况下是3000毫秒,即3秒
        producer.setSendMsgTimeout(1000);
        // 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
        producer.start();

        // 循环发送MQ测试消息
        String content = "";
        for (int i = 0; i < 5; i++) {
            try {
                content = "【MQ测试消息】测试消息 " + i;
                // 构建一条消息
                Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_RETRY, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息,发送消息到一个 Broker。默认以同步方式发送
                SendResult sendResult = producer.send(message);

                // 消息发送成功
                System.out.printf("Send MQ message success! Topic: %s, Tag: %s, MsgId: %s, Message: %s %n",
                        message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
            } catch (Exception e) {
                // 只有当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                System.out.printf(new Date() + ", 异常信息:%s %n", e);
                Thread.sleep(1000);
            }
        }

        // 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
        producer.shutdown();
    }
}

1.4. 验证结果

正常开启 RocketMQ 服务,启动生产者:

从生产者输出的日志可以看到,后面的 4 条消息各重试了 3 次:
org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [1]ms, Topic: TOPIC_MEIWEI_SMS_NOTICE_TEST, BrokersSent: [YYW-SH-PC-1454, YYW-SH-PC-1454, YYW-SH-PC-1454]

再看看 RocketMQ 日志。如果是 Windows 安装的 RocketMQ,且使用的是默认日志配置,则可以在路径 C:\Users\yourname\logs\rocketmqlogs 下查看 rocketmq_client.log

日志文件 rocketmq_client.log 输出了源码中的日志:sendKernelImpl exception, resend at once, InvokeID

2. 消费端重试

Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

  1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
  2. 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

**只有在消息模式为 MessageModel.CLUSTERING 集群模式时,Broker 才会自动进行重试,广播消息模式下不会自动进行重试。**消费者消费消息后,需要给 Broker 返回消费状态。以 MessageListenerConcurrently 监听器为例,Consumer 消费完成后需要返回 ConsumeConcurrentlyStatus 消费状态。

RocketMQ 会为每个消费组都设置一个 Topic 名称为 “%RETRY%+consumerGroup” 的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为 “SCHEDULE_TOPIC_XXXX” 的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。

2.1. 源码分析

ConsumeConcurrentlyStatus 有 消费成功 和 消费失败 两种状态:

public enum ConsumeConcurrentlyStatus {
    // 消费成功
    CONSUME_SUCCESS,
    // 消费失败,需要稍后重新消费
    RECONSUME_LATER;

    private ConsumeConcurrentlyStatus() {
    }
}

Consumer 端的重试包括两种情况:

  1. 异常重试:由于 Consumer 端逻辑出现了异常,导致返回了 RECONSUME_LATER 状态,那么 Broker 就会在一段时间后尝试重试。
  2. 超时重试:如果 Consumer 端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker 就会认为 Consumer 消费超时,此时会发起超时重试。

因此,如果 Consumer 端正常消费成功,一定要返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态。

2.2. 生产者

public class RetryExceptionMqProducer {

    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    private static final String MQ_CONFIG_TAG_RETRY = "PID_MEIWEI_SMS_RETRY_EXCEPTION";

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-retry");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        String content = "【MQ测试消息】测试消息 ";
        Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_RETRY, content.getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息,发送消息到一个 Broker。默认以同步方式发送
        SendResult sendResult = producer.send(message);
        System.out.printf("Send MQ message success! Topic: %s, Tag: %s, MsgId: %s, Message: %s %n",
                message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));

        producer.shutdown();
    }
}

2.3. 异常重试

实现一个设置了最大重试次数 maxReconsumeTimes 为 4,但业务异常中有重试阀值 3,满足阀值条件,则返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 不再重试的消费者:

public class Retry4ExceptionMqConsumer {
    // Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_RETRY_EXCEPTION";

    public static void main(String[] args) throws Exception {
        // 声明并初始化一个 consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-retry-exception");
        // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
        consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
        // 设置最大重试数次
        consumer.setMaxReconsumeTimes(4);

        // 注册一个监听器,主要进行消息消费的逻辑处理
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 获取消息
                MessageExt msg = list.get(0);

                try {
                    // 获取重试次数
                    int reconsumeTimes = msg.getReconsumeTimes() + 1;
                    System.out.printf(new Date() + ",第 %s 次轮询消费 %n", reconsumeTimes);

                    // 模拟业务逻辑。此处为超过最大重试次数,自动标记消息消费成功
                    if (reconsumeTimes >= 3) {
                        System.out.printf(new Date() + ",超过最大重试次数,自动标记消息消费成功 Topic: %s, Tags: %s, Message: %s %n",
                                msg.getTopic(), msg.getTags(), new String(msg.getBody()));
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }

                    // 模拟异常发生
                    int num = 1 / 0;
                    System.out.printf(new Date() + ",第 %s 次正常消费 %n", reconsumeTimes);

                    // 返回消费状态
                    // CONSUME_SUCCESS 消费成功
                    // RECONSUME_LATER 消费失败,需要稍后重新消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 获取重试次数
                    int reconsumeTimes = msg.getReconsumeTimes() + 1;
                    System.out.printf(new Date() + ",第 %s 次重试消费,异常信息:%s %n", reconsumeTimes, e);
                    // 每次重试时间间隔遵循延时等级递增:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        // 调用 start() 方法启动 consumer
        consumer.start();
        System.out.println("Retry Consumer Started.");
    }
}

2.3.1 测试结果

2.3.2 实验总结

  1. 如果 maxReconsumeTimes 的指定重试次数 大于 业务重试最大重试阀值 reconsumeTimes,则完成业务逻辑处理,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 后,不再重试;若此处没有返回 CONSUME_SUCCESS,则还会继续重试,到指定重试次数截止,即便没有返回 CONSUME_SUCCESS
  2. 如果 maxReconsumeTimes 的指定重试次数 小于 业务重试最大重试阀值 reconsumeTimes,则重试完指定重试次数后不再重试,即便没有返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;若没有设定业务重试最大重试阀值校验也同理
  3. 如果没有设置 maxReconsumeTimes 的指定重试次数,也没有设定业务重试最大重试阀值处返回 CONSUME_SUCCESS,则会一直发起重试;如果重试 16 次还是没有返回 CONSUME_SUCCESS 成功状态,就会认为消息消费不了,丢进死信队列
  4. 以上重试时间间隔遵循延时等级逐次递增:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;这种“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小
  5. 重试期间,即便关闭然后再次找开当前消费者,也能继续收到重试消息/进度状态
  6. 默认重试次数:Producer 生产端重试默认是 2 次,而 Consumer 消费端重试默认是 16 次
  7. 失效情况:Producer 生产端在异步发送情况下重试失效;而 Consumer 消费端在广播消费模式下重试失效

2.4. 超时重试

这里的超时重试并非真正意义上的超时,它是说获取消息后,因为某种原因没有给 RocketMQ 返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 return ConsumeConcurrentlyStatus.RECONSUME_LATER。这种情况 MQ 会无限制的发送消息给消费端,因为 RocketMQ 会认为该消息没有发送,所以会一直发送。

2.4.1 代码示例

public class Retry4TimeoutMqConsumer {
    // Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
    private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
    private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_RETRY_TIMEOUT";

    public static void main(String[] args) throws Exception {
        // 创建一个 consumer 消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-retry-timeout");
        // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
        consumer.setNamesrvAddr("127.0.0.1:9876");
        // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
        consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
        // 设置消费超时时间(默认值15L,为15分钟)
        consumer.setConsumeTimeout(1L);
        // 设置最大重试数次
        consumer.setMaxReconsumeTimes(2);

        // 注册一个监听器,主要进行消息消费的逻辑处理
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 获取消息
                MessageExt msg = list.get(0);
                try {
                    // 获取重试次数
                    int reconsumeTimes = msg.getReconsumeTimes() + 1;
                    if (reconsumeTimes == 1) {
                        // 模拟操作:设置一个大于上面已经设置的消费超时时间 来验证超时重试场景(setConsumeTimeout(1L))
                        System.out.println("---------- 服务暂停 ---------- " + new Date());
                        Thread.sleep(1000 * 60 * 2);
                    } else {
                        System.out.println("---------- 重试消费 ---------- " + new Date());
                    }

                    System.out.printf(new Date() + " 第 %s 次重试消费:Topic: %s, Tags: %s, MsgId: %s, Message: %s %n",
                            reconsumeTimes, msg.getTopic(), msg.getTags(), msg.getMsgId(), new String(msg.getBody()));

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    System.out.printf(new Date() + ",异常信息:%s %n", e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        // 调用 start() 方法启动 consumer
        consumer.start();
        System.out.println("Retry Timeout Consumer Started.");
    }
}

2.4.2 测试结果

测试期间,当控制台输出日志 “服务暂停” 后,关闭当前消费者:

再次开启该消费者:

2.4.3 实验总结

  1. 重试期间,关闭当前消费者,再开启该消费者,合理区间内该消费者也能再次收到重试消息或者消费的进度状态
  2. 如果设置了指定最大重试次数,但有业务重试次数阀值校验中返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 后,便不再重试

相关文章