本文来介绍RocketMQ生产者发送消息默认使用的DefaultMQProducer类。
生产者 向消息队列里写入消息,不 同的业务场景需要生产者采用不同的写入策略 。 比如同步发送、异步发送、 延迟发送、 发送事务消息等。
我们结合代码来了解一下,
代码位置在package org.apache.rocketmq.example.simple;
public class AsyncProducer {
public static void main(
String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i = 0; i < 10000000; i++) {
try {
final int index = i;
Message msg = new Message("Jodie_topic_1023",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
}
}
代码讲解
发送消息要经过几个步骤 :
(1 )设置 Producer 的 GroupName。
(2 )设置 lnstanceName,当一个 Jvm 需要启动多个 Producer 的时候,通过设置不同的 InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。(本例没有写)
( 3 )设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次 。
(4 )设置 NameServer 地址 。
(5 )组装消息并发送 。
消息的发送有同步和异步两种方式,上面的代码使用的是异步方式 。消息发送的返回状态有如下四种 : FLUSH_DISK_TIMEOUT 、 FLUSH_SLAVE_TIMEOUT 、SLAVE_NOT_AVAILABLE 、SEND_OK,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的 。
发送延迟消息
Broker收到这类消息后 ,延迟一段时间再处理, 使消息在规定的一段时间后生效。
使用方法:在创建 Message对象时,调用 setDelayTimeLevel ( int level) 方法设置延迟时间, 然后再把这个消息发送 出去。 目前延迟的时间不支 持任意设置,仅支持预设值的时间长度 ( 1s/5s/1Os/30s/Im/2m/3m/4m/5m/6m/ 7m/8m/9m/1Om/20m/30m/1h/2h)。 比如 setDelayTimeLevel(3)表示延迟 10s。
自定义消息发送规则
一个 Topic会有多个 Message Queue,如果使用 Producer的默认配置,这 个 Producer 会轮流向各个 Message Queue 发 送 消息 。 Consumer 在消费消息的 时候,会根据负载均衡策略,消费被分配到的 Message Queue,如果不经过特 定的设置,某条消息被发往哪个 Message Queue,被哪个 Consumer 消费是未 知的。
如果业务 需 要我们把消息 发 送到指定的 Message Queue 里,比如把同 一 类型 的消息都发 往 相同的 Message Queue,可以用 Message QueueSelector。
发送消息的时候,把 MessageQueueSelector 的对象作为参数,使用 public SendResult send ( Message msg, MessageQueueSelector selector, Object arg)函 数发送消 息即可 。 在 MessageQueueSelector 的实现中,根据传人的 Object参 数,或者根据 Message 消息内容确定把消息发往那个 Message Queue,返回被 选中的 Message Queue。
版权说明 : 本文为转载文章, 版权归原作者所有 版权申明
原文链接 : https://blog.csdn.net/weixin_38003389/article/details/86673459
内容来源于网络,如有侵权,请联系作者删除!