RocketMQ高手之路系列之七:RocketMQ之消息发送(四)

x33g5p2x  于2021-12-19 转载在 其他  
字(4.7k)|赞(0)|评价(0)|浏览(552)

引言

前面我们介绍了RocketMQ在发送消息之前做了一系列的准备事项,其中包括路由选择、队列选择以及坏点Broker退避等等。本文将开始阐述RocketMQ的消息发送过程。

  • 消息发送基本流程
  • 总结

一、消息发送基本流程

下面我们看下进行消息发送的最核心的API,即DefaultMQProducerImpl类中的sendKernelImpl方法如下所示(相关字段注释如下):

private SendResult sendKernelImpl(
//需要发送的消息
final Message msg,
									  //消息需要发送到的消息队列
                                      final MessageQueue mq,
                                      //消息发送模式
                                      final CommunicationMode communicationMode,
                                      //异步消息回调
                                      final SendCallback sendCallback,
                                      //主题路由信息
                                      final TopicPublishInfo topicPublishInfo,
                                      //超时时间
                                      final long timeout)

涉及的具体步骤如下所示:

我们具体看下源码,相关注释如下所示:

//根据选择的MessageQueue获取对应Broker地址
 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
 		//如果为获取到
        if (null == brokerAddr) {
        	//从NameServer进行主动更新TOPIC信息
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
        }

进行全局ID设置,源码如下:

if (!(msg instanceof MessageBatch)) {
 					//设置全局唯一ID
                    MessageClientIDSetter.setUniqID(msg);
                }
				//消息提的默认大小大小超过4K,则进行zip压缩,并设置消息系统标记
                int sysFlag = 0;
                boolean msgBodyCompressed = false;
                if (this.tryToCompressMessage(msg)) {
                    sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
                    msgBodyCompressed = true;
                }
				//如果事务,则进行系统事务标记
                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
                }

是否执行钩子函数,源码如下所示:

//判断是否进行消息发送钩子函数注册了,为一个列表
if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducer(this);
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
                    if (isTrans != null && isTrans.equals("true")) {
                        context.setMsgType(MessageType.Trans_Msg_Half);
                    }

                    if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
                        context.setMsgType(MessageType.Delay_Msg);
                    }
                    this.executeSendMessageHookBefore(context);
                }

进行消息发送请求包的构建,源码如下所示:

//消息发送请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//设置消息生产组 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
//设置主题名称 
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//设置默认 在单个Broker默认队列数 
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                //设置系统标记
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                requestHeader.setBatch(msg instanceof MessageBatch);
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }

                    String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
                    if (maxReconsumeTimes != null) {
//设置最大重试次数 
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
                    }
                }

最后根据communicationMode,进行消息发送模式的选择,其中包括了同步发送方式、异步发送方式以及单向发送方式,大致的源码如下所示:

SendResult sendResult = null;
                switch (communicationMode) {
                	//异步方式
                    case ASYNC:
                        Message tmpMessage = msg;
                        if (msgBodyCompressed) {
                            //If msg body was compressed, msgbody should be reset using prevBody.
                            //Clone new message using commpressed message body and recover origin massage.
                            //Fix bug:https://github.com/apache/rocketmq-externals/issues/66
                            tmpMessage = MessageAccessor.cloneMessage(msg);
                            msg.setBody(prevBody);
                        }
                        long costTimeAsync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeAsync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            tmpMessage,
                            requestHeader,
                            timeout - costTimeAsync,
                            communicationMode,
                            sendCallback,
                            topicPublishInfo,
                            this.mQClientFactory,
                            this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
                            context,
                            this);
                        break;
                    //单向以及同步
                    case ONEWAY:
                    case SYNC:
                        long costTimeSync = System.currentTimeMillis() - beginStartTime;
                        if (timeout < costTimeSync) {
                            throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
                        }
                        sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
                            brokerAddr,
                            mq.getBrokerName(),
                            msg,
                            requestHeader,
                            timeout - costTimeSync,
                            communicationMode,
                            context,
                            this);
                        break;
                    default:
                        assert false;
                        break;
                }

二、总结

本文主要阐述了消息发送的基本流程。其中包括了获取Broker地址、设置全局ID、构建请求包以及发送消息了。在下篇文章中我们继续来看消息发送的底层流程。

相关文章