如何使用StreamBridge将具有优先级的消息发送到RabbitMQ

6jygbczu  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(1)|浏览(468)

我使用的是RabbitMQ。我定义了一个具有优先级的队列,我可以使用RMQ GUI向该队列发送具有某个优先级值的消息,消费者也可以按排序顺序获得消息,但当我尝试使用流桥从我的java代码发送消息时,我不知道如何指定消息的优先级。以下是我尝试的方法:
1.我添加了x-max-priority:10添加到队列中。
1.消费者示例=

@Bean
 public Consumer<Message<String>> testListener() {
 return (m) -> {
     System.out.println("inside consumer with message : " + m);
     System.out.println("headers : " + m.getHeaders());
     System.out.println("payload : " + m.getPayload());
 };
 }

1.生成器示例=

@GET
 @Path("test/")
 public void test(@Context HttpServletRequest request) {
 System.out.println("inside test");
 try {
     String payload = "hello world";
     logger.info("going to send a message : {}", payload);
     int priority = 5;
     Message<String> message = MessageBuilder.withPayload(payload)
                     .setHeader("priority", priority)
                     .build();
     boolean res = STREAM_BRIDGE.send("testWriter-out-0", message);
     System.out.println(message);
     System.out.println(res);
 } catch (Exception e) {
     logger.error(e);
 }
}

生成方的输出=

-> inside test
    -> GenericMessage [payload=hello world, headers={priority=5, id=some_id, timestamp=epoch}]
    -> true

消费者的输出=

-> inside consumer with message : GenericMessage [payload=hello world, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=test_exchange, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_exchange.ats, amqp_redelivered=false, amqp_receivedRoutingKey=test_exchange, amqp_timestamp=date_time, amqp_messageId=some_id, id=some_id, amqp_consumerTag=some_tag, sourceData=(Body:'hello world' MessageProperties [headers={}, timestamp=date_time, messageId=some_id, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test_exchange, deliveryTag=1, consumerTag=some_tag, consumerQueue=test_exchange.ats]), contentType=application/json, timestamp=epoch}]
    -> headers : {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=test_exchange, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_exchange.ats, amqp_redelivered=false, amqp_receivedRoutingKey=test_exchange, amqp_timestamp=date_time, amqp_messageId=some_id, id=some_id, amqp_consumerTag=tag, sourceData=(Body:'hello world' MessageProperties [headers={}, timestamp=date_time, messageId=some_id, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test_exchange, deliveryTag=1, consumerTag=tag, consumerQueue=test_exchange.ats]), contentType=application/json, timestamp=epoch}
    -> payload : hello world

因此,消息将发送到RMQ,使用者也将获得消息,但在RMQGUI上,当我对Queue执行Get-message操作时,会得到以下结果=〉

Message 1
    The server reported 0 messages remaining.
    Exchange    test_exchange
    Routing Key test_exchange
    Redelivered ○
    Properties  
                timestamp:  timestamp
                message_id: some_id
                priority:   0
                delivery_mode:  2
                headers:    
                content_type:   application/json

    Payload     hello world
    11 bytes
    Encoding: string

正如我们在上面的结果中所看到的,RMQ将priority设置为0(因此在Consumer中,我以FIFO的方式而不是基于优先级的方式获取消息),并且在头内:仅存在一个报头“content_type:application/json”,所以我认为优先级不是头的一部分而是属性的一部分,那么如何使用StreamBridge设置消息属性呢?
最后,我正在尝试弄清楚如何在使用StreamBridge发送消息时动态地设置消息的优先级,任何帮助都将不胜感激,提前感谢!

edqdpe6u

edqdpe6u1#

请考虑使用最新的Spring Cloud Stream:https://spring.io/projects/spring-cloud-stream#learn。
显然,您的spring-cloud-starter-stream-rabbit = 3.0.3.RELEASE已经足够老了,足以受到https://github.com/spring-cloud/spring-cloud-stream/issues/1931问题的困扰。
我刚刚测试了最新的一个,我得到了正确的priority属性的消息张贴到RabbitMQ队列由提到的StreamBridge

相关问题