【RocketMq实战第九篇】-不同场景解决方案

x33g5p2x  于2021-12-19 转载在 其他  
字(3.1k)|赞(0)|评价(0)|浏览(434)

前言

RocketMQ完结篇,主要使用消息队列处理一些业务场景。

正文

顺序消息

顺序消息分为全局顺序消息和部分顺序消息。

全局顺序消息指某个 Topic 下的 所有消息都要保证顺序;

部分顺序消息只要保证每一组消息被顺序消费即可,如订单消息的例子,只要保证 同一个订单 ID 的三个消息能按顺序消费 即可。

全局顺序消息

要保 证全局顺序消息, 需要 先把 Topic 的读写队列数设置为 一,然后 Producer 和 Consumer 的并发设置也要是一 。 简单来说,为了保证整个 Topic 的 全局消息有序,只能消除所有的并发处理,各部分都设置成单线程处理 。 这时 高并发、高吞吐量的功能完全用不上了 。

部分顺序消息

要保证部分消息有序,需要发送端和消费端配合处理 。 在发送端,要做到 把同一业务 ID 的消息发送到同一个 Message Queue ;在消费过程中,要做到从 同一个 Message Queue 读取的消息不被并发处理,这样才能达到部分有序 。

发送端使用 MessageQueueSelector类来控制 把消息发往哪个 Message Queue,如代码:

for (int i= 0; i <100; i++) { 
    int orderid = i ;
    Message msg =new Message ("OrderTopic8”, tags,”KEY" + i ,
    (”Hello RocketMQ " +orderid+” ” + i) .getBytes (RemotingHelper . DEFAULT_CHARSET)) ;
    SendResult sendResult = Producer.send(msg, new MessageQueueSelector()
        @Override
        public MessageQueue select (List<MessageQueue> mqs , Message msg ,Object arg) {
            System.out.println(”queue selector mq nums:”+mqs.size()); 
            System.out.println (”msg info : ”+msg . toString()) ;
                for (MessageQueue mq: mqs) {
                    System.out.println (mq.toString()) ;
                    }
                Integer id = (Integer) arg ; 
                int index = id % mqs.size();
                 return mqs.get(index) ;
                 }
           }, orderid );
            System .out.println(sendResult)
    }

消息重复问题

RocketMQ 选 择了确保一定投递,保证消息不丢失,但有可能造成消息重复 。setRetryTimesWhenSendFailed, 设置在同步方式下自动重试的次数,默认值是 2,这样当第一次发送消息时, Broker端接收到了消息但 是没有正 确返回发送成功的状态,就造成了消息 重复。

消息重复一般情况下不会发生,但是如果消息量大,网络有波动,消息重 复就是个大概率事件。

解决消息重复有两种方法:

  • 第一种方法是保证消费逻辑的幕等性(多次调 用和一次调用效果相同);
  • 另一种方法是维护一个巳消费消息的记录,消费前查 询这个消息是否被消费过 。 这两种方法都需要使用者自己实现 。

消息优先级

RocketMQ 是个先人先出的队列,不支持消息级别或者 Topic 级别的优先级 。可以通过间接的方式解决,下面列举三种优先级相 关需求的具体处理方法 。

第一种

多个不同的消息类型使用同一个topic时,由于某一个种消息流量非常大,导致其他类型的消息无法及时消费,造成不公平,所以把流量大的类型消息在一个单独的 Topic,其他类型消息在另外一个 Topic,应用程序创建两个 Consumer,分别订阅不同的 Topic,这样就可以了。

第二种

情况和第一种情况类似,但是不用创建大量的 Topic。举个实际应用场景: 一个订单处理系统,接收从 100家快递门店过来的请求,把这些请求 通过 Producer 写人 RocketMQ ;订单处理程序通过 Consumer 从队列里读取消 息并处理,每天最多处理 1 万单 。 如果这 100 个快递门店中某几个门店订单量 大增,比如门店一接了个大客户,一个上午就发出 2万单消息请求,这样其他 的 99 家门店可能被迫等待门店一的 2 万单处理完,也就是两天后订单才能被处 理,显然很不公平 。

这时可以创建 一 个 Topic, 设置 Topic 的 MessageQueue 数 量 超过 100 个,Producer根据订单的门店号,把每个门店的订单写人 一 个 MessageQueue。 DefaultMQPushConsumer默认是采用循环的方式逐个读取一个 Topic 的 所有 MessageQueue,这样如果某家门店订单 量 大增,这家门店对应的 MessageQueue 消息数增多,等待时间增长,但不会造成其他家门店 等 待时间 增长。

DefaultMQPushConsumer 默认的 pullBatchSize 是 32,也就是每次从某个 MessageQueue 读取消息的时候,最多可以读 32 个 。 在上面的场景中,为了更 加公平,可以把 pullBatchSize 设置成 1。

第三种

强制优先级

TypeA、 TypeB、 TypeC 三类消息 。 TypeA 处于第 一优先级,要确保只要有 TypeA消息,必须优先处理; TypeB处于第二优先 级; TypeC 处于第 三 优先级 。 对这种要求,或者逻辑更复杂的要求,就要用 户自己编码实现优先级控制,如果上述的 三 类消息在一个 Topic 里,可以使 用 PullConsumer,自主控制 MessageQueue 的遍历,以及消息的读取;如果 上述三类消息在三个 Topic下,需要启动三个 Consumer, 实现逻辑控制三个 Consumer 的消费 。

提高 Consumer 处理能力

(1)提高消费并行度

在同一个 ConsumerGroup 下( Clustering 方式),可以通过增加 Consumer 实例的数量来提高并行度,通过加机器,或者在 已有机器中启动 多个 Consumer 进程都可以增加 Consumer实例数。注意总的 Consumer数量不要超过 Topic下 Read Queue 数量,超过的 Consumer 实例接收不到消息。

(2)以批量方式进行消费

多条消息同时处理的时间会大大小于逐个处理的时间总和,比如消费消息中涉及 update 某个数据库, 一次 update10条的时间会大大小于十次 update1条数据的时 间 。 这时可以通过批量方式消费来提高消费的吞吐 量 。

实现方法是设置 Consumer 的 consumeMessageBatchMaxSize 这个参数 ,默 认是 1,如果设置为 N,在消息多的时候每次收到的是个长度为 N的消息链表。

(3)检测延时情况,跳过非重要消息

public ConsumeConcurrentlyStatus consumeMessage (List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    long Offset = msgs.get (0).getQueueOffset() ;
    String maxOffset = msgs.get(0).getProperty(Message PROPERTY MAX OFFSET); 
    long d iff = Long.parseLong(maxOffset) - Offset;
        if (diff > 90000) {
        return ConsumeConcurrentlyStatus .CONSUME SUCCESS;
    //正常消费消息
    return ConsumeConcurrentlyStatus . CONSUME SUCCESS; 
}

当某个队列的消息数堆积到 90000条以上,就直接丢弃,以 便快速追上发送消息的进度 。

相关文章