如何在Python中使用RabbitMQ进行简单的单元测试?

igsr9ssn  于 2022-11-08  发布在  RabbitMQ
关注(0)|答案(2)|浏览(192)

在我的单元测试中,我想简单地开始消费,发布一条消息,接收返回的响应,并Assert响应是否是我所期望的。但是,我已经尝试了几个小时,但没有找到解决方案。
问题是我不能在类中定义一个停止消费的方法。我试着定义一个这样的方法:
第一个
但是似乎没有任何效果。我曾经读到过,这是因为一旦你执行了start_consuming(),停止消费的唯一方法就是在发送消息后取消它。但是如果我这样做,那么我将修改原始的on_request,这对我的应用程序没有任何用处。因为连接会在第一条消息后关闭。我已经找到了pytest-rabbitmq,但文档对我来说并不清楚,因此不知道我是否可以使用此插件来实现我想要的。
顺便问一下,basic_cancelstop_consumingclose之间的区别是什么?

23c0lvtd

23c0lvtd1#

我对您的场景没有一个清晰的概念!根据我的理解,您可以使用相同的方法创建连接和通道,以便在需要时可以发布、使用、Assert和停止使用
希望这对你有帮助!

def test_rabbitmq():
    from pika import BlockingConnection, ConnectionParameters, PlainCredentials

    conn = BlockingConnection(ConnectionParameters(host='host', virtual_host='vhost', credentials=PlainCredentials('username', 'password')))
    channel = conn.channel()

    # define your consumer
    def on_message(channel, method_frame, header_frame, body):
        message = body.decode()
        # assert your message here
        # asset message == 'value'
        channel.basic_cancel('test-consumer')  # stops the consumer

    # define your publisher
    def publish_message(message):
        channel.basic_publish(exchange='', routing_key='', body=message')

    publish('your message')
    tag = channel.basic_consume(queue='queue', on_message_callback=on_message, consumer_tag='test-consumer')

stop_consuming-取消所有使用者,并发出退出start_consuming循环的信号。
basic_cancel-此方法取消消费者。消费者标记将作为输入。
close-关闭连接/通道
Reference

33qvvth1

33qvvth12#

以下是pytest-rabbitmq插件的示例:

import asyncio
import rabbitpy

async def test_queue(fastapi_client, mocker, rabbitmq):
    mock_consumer_method = mocker.MagicMock(name="consumer_method")
    mocker.patch(
        "my.project.consumer_method",
        new=mock_consumer_method,
    )

    connection_details = rabbitmq.args
    os.environ[
        "RABBITMQ_CONNECTION_STRING"
    ] = f"amqp://{connection_details['username']}:{connection_details['password']}@{connection_details['host']}:{connection_details['port']}/"  # this is used by my consumer

    channel = rabbitmq.channel()
    queue = rabbitpy.Queue(
        channel, name="QUEUE_NAME", durable=False, message_ttl=5 * 60 * 1000
    )
    queue.declare()
    message = rabbitpy.Message(channel, "some message")
    message.publish(exchange="", routing_key=PRODUCE_QUEUE_NAME)

    with fastapi_client:  # triggers the consumer startup event
        await asyncio.sleep(20)

    mock_consumer_method.assert_called_once_with("some message")

我的FastAPI服务器中已经有一个消费者,您的用例可能会有所不同。

相关问题