前面我们介绍了RocketMQ
在发送消息之前做了一系列的准备事项,其中包括路由选择、队列选择以及坏点Broker
退避等等。本文将开始阐述RocketMQ
的消息发送过程。
下面我们看下进行消息发送的最核心的API,即DefaultMQProducerImpl
类中的sendKernelImpl
方法如下所示(相关字段注释如下):
private SendResult sendKernelImpl(
//需要发送的消息
final Message msg,
//消息需要发送到的消息队列
final MessageQueue mq,
//消息发送模式
final CommunicationMode communicationMode,
//异步消息回调
final SendCallback sendCallback,
//主题路由信息
final TopicPublishInfo topicPublishInfo,
//超时时间
final long timeout)
涉及的具体步骤如下所示:
我们具体看下源码,相关注释如下所示:
//根据选择的MessageQueue获取对应Broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
//如果为获取到
if (null == brokerAddr) {
//从NameServer进行主动更新TOPIC信息
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
进行全局ID设置,源码如下:
if (!(msg instanceof MessageBatch)) {
//设置全局唯一ID
MessageClientIDSetter.setUniqID(msg);
}
//消息提的默认大小大小超过4K,则进行zip压缩,并设置消息系统标记
int sysFlag = 0;
boolean msgBodyCompressed = false;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
msgBodyCompressed = true;
}
//如果事务,则进行系统事务标记
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
是否执行钩子函数,源码如下所示:
//判断是否进行消息发送钩子函数注册了,为一个列表
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
进行消息发送请求包的构建,源码如下所示:
//消息发送请求头
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
//设置消息生产组 requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
//设置主题名称
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
//设置默认 在单个Broker默认队列数
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
//设置系统标记
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
//设置最大重试次数
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
最后根据communicationMode
,进行消息发送模式的选择,其中包括了同步发送方式、异步发送方式以及单向发送方式,大致的源码如下所示:
SendResult sendResult = null;
switch (communicationMode) {
//异步方式
case ASYNC:
Message tmpMessage = msg;
if (msgBodyCompressed) {
//If msg body was compressed, msgbody should be reset using prevBody.
//Clone new message using commpressed message body and recover origin massage.
//Fix bug:https://github.com/apache/rocketmq-externals/issues/66
tmpMessage = MessageAccessor.cloneMessage(msg);
msg.setBody(prevBody);
}
long costTimeAsync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeAsync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
tmpMessage,
requestHeader,
timeout - costTimeAsync,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
//单向以及同步
case ONEWAY:
case SYNC:
long costTimeSync = System.currentTimeMillis() - beginStartTime;
if (timeout < costTimeSync) {
throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");
}
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout - costTimeSync,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
本文主要阐述了消息发送的基本流程。其中包括了获取Broker地址、设置全局ID、构建请求包以及发送消息了。在下篇文章中我们继续来看消息发送的底层流程。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/Diamond_Tao/article/details/97614527
内容来源于网络,如有侵权,请联系作者删除!