rabbitmq aio-pika 2消费者通过fanout接收消息,但不是在同一时间

yrdbyhpb  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(136)

我是RabbitMQ和Pika的新手,但我认为我已经清楚地了解了它的工作原理。
我需要实现这一点:
生产者创建消息并通过扇出交换发送,多个生产者(测试环境中为2个)接收相同的消息。
但每次只有1个消费者收到消息

  • 2019-11-29 19:02:44.167549 b 'Hello'* -第一位消费者
  • 2019-11-29 19:02:45.068192 b 'Hello'* -第二位消费者

制片人:

async def main(loop):
        connection = await connect_robust(
            "amqp://guest:[email protected]/", loop=loop
        )

        queue_name = "test_queue"
        routing_key = "test_queue"

        # Creating channel
        channel = await connection.channel()

        # Declaring exchange
        exchange = await channel.declare_exchange('test_exchange',
                                                  ExchangeType.FANOUT, auto_delete=True
                                                  )

        # Declaring queue
        queue = await channel.declare_queue(
            queue_name, auto_delete=True
        )

        # Binding queue
        await queue.bind(exchange, routing_key)

        await exchange.publish(
            Message(
                bytes('Hello', 'utf-8'),
                content_type='text/plain',
                headers={'foo': 'bar'}
            ),
            routing_key
        )
    )

    if __name__ == "__main__":
        loop = asyncio.get_event_loop()
        loop.run_until_complete(main(loop))

消费者:

async def main(loop):
            connection = await aio_pika.connect_robust(host='192.168.1.3', login='guest', password='guest', loop=loop
                                                       )

            queue_name = "test_queue"

            async with connection:
                # Creating channel
                channel = await connection.channel()

                # Declaring queue
                queue = await channel.declare_queue(
                    queue_name, auto_delete=True
                )

                async with queue.iterator() as queue_iter:
                    async for message in queue_iter:
                        async with message.process():
                            print(datetime.datetime.now(), message.body)

                            if queue.name in message.body.decode():
                                break

    if __name__ == "__main__":
          loop = asyncio.get_event_loop()
          loop.run_until_complete(main(loop))
          loop.close()
vc6uscn9

vc6uscn91#

首先,我假设您正在运行两个独立的消费者进程。
每个消费者都应该将自己的队列绑定到扇出交换。不要使用共享队列。一种解决方案是让每个消费者使用独占队列。
生产者不需要创建队列并将其绑定到扇出交换,只要您的消费者先开始即可。
先试试这个。然后,如果您需要考虑生产者可以首先启动,则必须创建两个具有已知名称的队列,并绑定它们。消费者在开始时也应该做同样的事情。

相关问题