为什么当消费者重启时RabbitMQ队列中的消息会丢失?

5kgi1eie  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(241)

我已经设置了一个RabbitMQ消费者,如下所示:

from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor

import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.INFO,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)

class QueueConsumer(object):
    """The consumer class to manage connections to the AMQP server/queue"""

    def __init__(self, queue, logger, parameters, thread_id=0):
        self.channel = None
        self.connection = None
        self.queue_name = queue
        self.logger = logger
        self.consumer_id = 'Thread: %d' % (thread_id,)
        self.parameters = pika.ConnectionParameters(**parameters)

    def _on_queue_declared(self, frame):
        self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
        self.channel.basic_qos(prefetch_count=1)
        try:
            self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
            self.logger.info("{} Declared queue...".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))

    def _on_channel_open(self, channel):
        self.channel = channel
        try:
            self.channel.queue_declare(queue=self.queue_name,
                                       exclusive=False,
                                       durable=True,
                                       auto_delete=False,
                                       callback=self._on_queue_declared)
            self.logger.info("{} Opened Channel....".format(self.consumer_id))
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))

    def _on_connected(self, connection):
        connection.channel(self._on_channel_open)

    def consume(self):
        try:
            self.connection = SelectConnection(self.parameters,
                                               self._on_connected)
            self.connection.ioloop.start()
        except Exception as e:
            self.logger.error('{} {}'.format(self.consumer_id, str(e)))
            self.connection.close()
            self.connection.ioloop.start()

    def decode(self, body):
        try:
            _body = body.decode('utf-8')
        except AttributeError:
            _body = body

        return _body

    def handle_delivery(self, channel, method, header, body):
        try:
            start_time = datetime.datetime.now()
            _logger.info("Received...")
            _logger.info("Content: %s" % body)
            req = json.loads(self.decode(body))

            # Do something
            sleep(randint(10, 100))

            time_taken = datetime.datetime.now() - start_time
            _logger.info("[{}] Time Taken: {}.{}".format(
                req.get("to_num"), time_taken.seconds, time_taken.microseconds))

        except Exception as err:
            _logger.exception(err)

if __name__ == "__main__":
    workers = 3
    pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
    try:
        pool = ThreadPoolExecutor(max_workers=workers)
        start = 1
        for thread_id in range(start, (workers + start)):
            pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)

    except Exception as err:
        _logger.exception(err)

我也有一个队列发布器,如下所示:

import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection

logging.basicConfig(handlers=[StreamHandler()],
                    level=logging.DEBUG,
                    format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)

class QueuePublisherClient(object):

    def __init__(self, queue, request):
        self.queue = queue
        self.response = None
        self.channel = None
        self.request = request
        self.corrId = str(uuid.uuid4())
        self.callBackQueue = None
        self.connection = None
        parameters = pika.ConnectionParameters(host="0.0.0.0")
        self.connection = SelectConnection(
            parameters, self.on_response_connected
        )
        self.connection.ioloop.start()

    def on_response(self, ch, method, props, body):
        if self.corrId == props.correlation_id:
            self.response = body
            self.connection.close()
            self.connection.ioloop.start()

    def on_response_connected(self, connection):
        _logger.info("Connected...\t(%s)" % self.queue)
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_connected(self, connection):
        self.connection = connection
        self.connection.channel(self.on_channel_open)

    def on_channel_open(self, channel):
        # _logger.info("Channel Opened...\t(%s)" % self.queue)
        self.channel = channel
        self.channel.queue_declare(queue=self.queue,
                                   durable=True,
                                   exclusive=False,
                                   auto_delete=False,
                                   callback=self.on_queue_declared)

    def on_queue_declared(self, frame):
        self.channel.basic_publish(exchange="",
                                   routing_key=self.queue,
                                   properties=pika.BasicProperties(),
                                   body=str(self.request))
        self.connection.close()
        _logger.info("Message Published...\t(%s)" % self.queue)

if __name__ == "__main__":
    data = {
        'text': 'This is a sample text',
        'to_num': '+2547xxxxxxxx'
    }
    count = 10000

    for index in range(count):
        data['index'] = index
        QueuePublisherClient("test_queue", json.dumps(data))

当我向队列发布10000条消息时,消费者没有启动,通过rabbitmqctl list_queues,我可以看到test_queue有10000条消息。当我启动消费者时,我运行rabbitmqctl list_queues,我看到队列有0条消息。但是,使用者仍在使用队列中的消息。问题是,当我停止消费者几秒钟后,然后重新启动它,我无法恢复我的消息。我怎么能逃避呢?
这只是对实际情况的模拟,在这种情况下,消费者进程被monit重新启动,而我遭受了消息丢失。

ogsagwnx

ogsagwnx1#

首先,您应该使用最新版本的Pika。
当您设置no_ack=True(Pika 1.0中的auto_ack=True)时,RabbitMQ会在消息传递时考虑消息已确认。这意味着当你停止它时,你的消费者在内存中(或在TCP堆栈中)的每条消息都会丢失,因为RabbitMQ认为它已被确认。
您应该使用no_ack=False(默认值),并在工作完成后在handle_delivery中确认消息。请注意,如果你的工作需要很长时间,你应该在另一个线程中完成,以防止阻塞Pika的I/O循环。
请参阅以下文档:https://www.rabbitmq.com/confirms.html

tvz2xvvm

tvz2xvvm2#

因为你已经声明了prefetch_count为1,并且你的队列是持久的,所以当消费者启动时,它只会一个接一个地处理消息。要检查同样的情况,您可以在代码中放置1秒的睡眠,并尝试在几秒钟后重新启动消费者。您将看到,已处理的消息仅从队列中删除。如果没有设置预取计数器,则只有一个使用者启动队列中的所有消息被清除。希望有帮助。

相关问题