rabbitmq 如何在aio_pika中使用死信交换拒绝消息?

g9icjywg  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(1)|浏览(291)

我正在使用aio_pika,当拒绝来自队列的消息时,rabbitMQ订阅工作者遇到了麻烦。我拒绝了队列中的消息,并将其交换到死信队列。邮件到达死信队列。但消息不会从原始队列中消失。因此,工人没有停止,大量的消息进入死信队列。
当消息被拒绝时,我想从原始队列中删除消息。
我试过了。你能帮帮我吗?

from aio_pika.abc import AbstractIncomingMessage

async def callback(
    message: AbstractIncomingMessage,
) -> None:
    try:
        async with message.process(ignore_processed=True):
            parsed = json.loads(message.body)
            success = False
            if False:
                log.msg("[x] Reject")
                await message.reject(requeue=False)
            else:
                log.msg("[x] Updated!")
af7jpaap

af7jpaap1#

让我更清楚

from aio_pika.abc import AbstractIncomingMessage

def some_validation(message) -> bool:
    # some logic
    return True

async def callback(
    message: AbstractIncomingMessage,
) -> None:
    async with message.process(ignore_processed=True):
        parsed = json.loads(message.body)
        success = False
        if some_validation(parsed):
            log.msg("[x] Updated!")
            # some logic
            return None    
        log.msg("[x] Reject")
        await message.reject(requeue=False)

相关问题