我有两个通过rabbitmq通信的微服务,我需要实现优先级消息。
第一个微服务充当发布者,使用symfony + messenger(amqp传输)编写。
第二个微服务充当消费者,用python + pika编写。
messenger文档(https://symfony.com/doc/current/messenger.html#prioritized-transports)建议为不同的消息优先级使用单独的队列,此组件无法使用rabbitmq的内置功能来区分消息的优先级。实际上发布者没有问题,我配置了它,以便必要的消息进入优先级队列。
消费者出现了问题,我不能让pika先读取优先级队列,然后读取常规队列。
下面是我的messenger组件配置的一个例子:
framework:
messenger:
transports:
priority:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: priority
queues:
priority: ~
normal:
dsn: "%env(MESSENGER_TRANSPORT_DSN)%"
options:
exchange:
name: normal
queues:
normal: ~
routing:
'App\Message\PriorityRequest': priority
'App\Message\NormalRequest': normal
这就是我如何填充队列:
for ($i = 0; $i < 10; $i++) {
$bus->dispatch(new PriorityRequest($i, 'priority'));
$bus->dispatch(new NormalRequest($i, 'normal'));
}
下面是一个python + pika中的消费者实现的例子:
import pika
import os
def do_work(self, connection, channel, delivery_tag, body):
print(body)
parameters = pika.URLParameters(os.getenv('MESSENGER_TRANSPORT_DSN'))
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.basic_qos(prefetch_count=1)
channel.queue_declare(queue='priority', durable=True)
channel.queue_declare(queue='normal', durable=True)
channel.basic_consume(queue='priority', on_message_callback=do_work, auto_ack=True)
channel.basic_consume(queue='normal', on_message_callback=do_work, auto_ack=True)
channel.start_consuming()
如果我们运行消费者代码,我们会得到以下输出:
{'id': 0, 'data': 'priority'}
{'id': 0, 'data': 'normal'}
{'id': 1, 'data': 'priority'}
{'id': 1, 'data': 'normal'}
{'id': 2, 'data': 'priority'}
{'id': 2, 'data': 'normal'}
{'id': 3, 'data': 'priority'}
{'id': 3, 'data': 'normal'}
{'id': 4, 'data': 'priority'}
{'id': 4, 'data': 'normal'}
{'id': 5, 'data': 'priority'}
{'id': 5, 'data': 'normal'}
{'id': 6, 'data': 'priority'}
{'id': 6, 'data': 'normal'}
{'id': 7, 'data': 'priority'}
{'id': 7, 'data': 'normal'}
{'id': 8, 'data': 'priority'}
{'id': 8, 'data': 'normal'}
{'id': 9, 'data': 'priority'}
{'id': 9, 'data': 'normal'}
消息以FIFO顺序处理,我如何强制pika首先处理优先级队列中的消息,只有当优先级队列为空时才能进入正常队列?
1条答案
按热度按时间mkshixfv1#
Pika开箱即用不支持此功能。
一个选项是首先从优先级队列中删除
basic_consume
。当队列为空时,取消该消费者,然后从另一个队列中删除basic_consume
。当工作完成后,重复并返回到优先级队列。