我是RabbitMQ和Pika的新手,但我认为我已经清楚地了解了它的工作原理。
我需要实现这一点:
生产者创建消息并通过扇出交换发送,多个生产者(测试环境中为2个)接收相同的消息。
但每次只有1个消费者收到消息
- 2019-11-29 19:02:44.167549 b 'Hello'* -第一位消费者
- 2019-11-29 19:02:45.068192 b 'Hello'* -第二位消费者
制片人:
async def main(loop):
connection = await connect_robust(
"amqp://guest:[email protected]/", loop=loop
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange('test_exchange',
ExchangeType.FANOUT, auto_delete=True
)
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes('Hello', 'utf-8'),
content_type='text/plain',
headers={'foo': 'bar'}
),
routing_key
)
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
消费者:
async def main(loop):
connection = await aio_pika.connect_robust(host='192.168.1.3', login='guest', password='guest', loop=loop
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue(
queue_name, auto_delete=True
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(datetime.datetime.now(), message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
1条答案
按热度按时间vc6uscn91#
首先,我假设您正在运行两个独立的消费者进程。
每个消费者都应该将自己的队列绑定到扇出交换。不要使用共享队列。一种解决方案是让每个消费者使用独占队列。
生产者不需要创建队列并将其绑定到扇出交换,只要您的消费者先开始即可。
先试试这个。然后,如果您需要考虑生产者可以首先启动,则必须创建两个具有已知名称的队列,并绑定它们。消费者在开始时也应该做同样的事情。