我使用的是RabbitMQ。我定义了一个具有优先级的队列,我可以使用RMQ GUI向该队列发送具有某个优先级值的消息,消费者也可以按排序顺序获得消息,但当我尝试使用流桥从我的java代码发送消息时,我不知道如何指定消息的优先级。以下是我尝试的方法:
1.我添加了x-max-priority:10添加到队列中。
1.消费者示例=
@Bean
public Consumer<Message<String>> testListener() {
return (m) -> {
System.out.println("inside consumer with message : " + m);
System.out.println("headers : " + m.getHeaders());
System.out.println("payload : " + m.getPayload());
};
}
1.生成器示例=
@GET
@Path("test/")
public void test(@Context HttpServletRequest request) {
System.out.println("inside test");
try {
String payload = "hello world";
logger.info("going to send a message : {}", payload);
int priority = 5;
Message<String> message = MessageBuilder.withPayload(payload)
.setHeader("priority", priority)
.build();
boolean res = STREAM_BRIDGE.send("testWriter-out-0", message);
System.out.println(message);
System.out.println(res);
} catch (Exception e) {
logger.error(e);
}
}
生成方的输出=
-> inside test
-> GenericMessage [payload=hello world, headers={priority=5, id=some_id, timestamp=epoch}]
-> true
消费者的输出=
-> inside consumer with message : GenericMessage [payload=hello world, headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=test_exchange, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_exchange.ats, amqp_redelivered=false, amqp_receivedRoutingKey=test_exchange, amqp_timestamp=date_time, amqp_messageId=some_id, id=some_id, amqp_consumerTag=some_tag, sourceData=(Body:'hello world' MessageProperties [headers={}, timestamp=date_time, messageId=some_id, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test_exchange, deliveryTag=1, consumerTag=some_tag, consumerQueue=test_exchange.ats]), contentType=application/json, timestamp=epoch}]
-> headers : {amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=test_exchange, amqp_deliveryTag=1, deliveryAttempt=1, amqp_consumerQueue=test_exchange.ats, amqp_redelivered=false, amqp_receivedRoutingKey=test_exchange, amqp_timestamp=date_time, amqp_messageId=some_id, id=some_id, amqp_consumerTag=tag, sourceData=(Body:'hello world' MessageProperties [headers={}, timestamp=date_time, messageId=some_id, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=test_exchange, receivedRoutingKey=test_exchange, deliveryTag=1, consumerTag=tag, consumerQueue=test_exchange.ats]), contentType=application/json, timestamp=epoch}
-> payload : hello world
因此,消息将发送到RMQ,使用者也将获得消息,但在RMQGUI上,当我对Queue执行Get-message操作时,会得到以下结果=〉
Message 1
The server reported 0 messages remaining.
Exchange test_exchange
Routing Key test_exchange
Redelivered ○
Properties
timestamp: timestamp
message_id: some_id
priority: 0
delivery_mode: 2
headers:
content_type: application/json
Payload hello world
11 bytes
Encoding: string
正如我们在上面的结果中所看到的,RMQ将priority设置为0(因此在Consumer中,我以FIFO的方式而不是基于优先级的方式获取消息),并且在头内:仅存在一个报头“content_type:application/json”,所以我认为优先级不是头的一部分而是属性的一部分,那么如何使用StreamBridge设置消息属性呢?
最后,我正在尝试弄清楚如何在使用StreamBridge发送消息时动态地设置消息的优先级,任何帮助都将不胜感激,提前感谢!
1条答案
按热度按时间edqdpe6u1#
请考虑使用最新的Spring Cloud Stream:https://spring.io/projects/spring-cloud-stream#learn。
显然,您的
spring-cloud-starter-stream-rabbit = 3.0.3.RELEASE
已经足够老了,足以受到https://github.com/spring-cloud/spring-cloud-stream/issues/1931问题的困扰。我刚刚测试了最新的一个,我得到了正确的
priority
属性的消息张贴到RabbitMQ队列由提到的StreamBridge
。