如何使用Quarkus读取Rabbitmq流

eqqqjvef  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(191)

如何使用Quarkus从RabbitMQ流中读取数据。是否可以使用SmallRye来实现这一点?或者我必须直接使用兔子客户端?
我想
1.从rabbit流读取数据(不是队列)
1.从某个时间点重放来自Rabbit流的数据
我没有找到任何关于使用SmallRye Package 的参考资料
我有这个配置

# incoming
mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
mp.messaging.incoming.requestss.exchange.name=quote-requests
mp.messaging.incoming.requestss.queue.name=quote-stream
mp.messaging.incoming.requestss.queue.x-queue-type=stream

但我得到以下错误

Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - consumer prefetch count is not set for 'queue 'quote-stream' in vhost '/'', class-id=60, method-id=20)
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
        at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:1378)
        ... 15 more
hujrc8aj

hujrc8aj1#

RabbitMQ Streams模型是一个只添加消息的日志。每个消费者开始阅读日志的指定偏移量。由于这种QoS(a.k.a.预取计数)必须设置为传入通道。在SmallRye RabbitMQ连接器中,max-outstanding-messages配置属性控制QoS值

mp.messaging.incoming.requestss.connector=smallrye-rabbitmq
mp.messaging.incoming.requestss.excahnge.name=quote-requests
mp.messaging.incoming.requestss.queue.name=quote-stream
mp.messaging.incoming.requestss.queue.x-queue-type=stream

# sets prefetch count / QoS to 100
mp.messaging.incoming.requestss.max-outstanding-messages=100

注意:我还没有找到配置RabbitMQ Streams特定参数的可能性(例如:x-stream-offsetx-max-agex-stream-max-segment-size-bytes等),所以我假设每个客户端都将使用这些参数的默认值。如果小黑麦做的话就太好了。

相关问题