我已经设置了一个RabbitMQ消费者,如下所示:
from collections import OrderedDict
from concurrent.futures import ThreadPoolExecutor
import pika
import datetime
import logging
import json
from logging import StreamHandler
from time import sleep
from random import randint
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.INFO,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueueConsumer(object):
"""The consumer class to manage connections to the AMQP server/queue"""
def __init__(self, queue, logger, parameters, thread_id=0):
self.channel = None
self.connection = None
self.queue_name = queue
self.logger = logger
self.consumer_id = 'Thread: %d' % (thread_id,)
self.parameters = pika.ConnectionParameters(**parameters)
def _on_queue_declared(self, frame):
self.logger.debug('{} ... declaring queue'.format(self.consumer_id))
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume(self.handle_delivery, queue=self.queue_name, no_ack=True)
self.logger.info("{} Declared queue...".format(self.consumer_id))
except Exception as e:
self.logger.error('{} crashing:--> {}'.format(self.consumer_id, str(e)))
def _on_channel_open(self, channel):
self.channel = channel
try:
self.channel.queue_declare(queue=self.queue_name,
exclusive=False,
durable=True,
auto_delete=False,
callback=self._on_queue_declared)
self.logger.info("{} Opened Channel....".format(self.consumer_id))
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
def _on_connected(self, connection):
connection.channel(self._on_channel_open)
def consume(self):
try:
self.connection = SelectConnection(self.parameters,
self._on_connected)
self.connection.ioloop.start()
except Exception as e:
self.logger.error('{} {}'.format(self.consumer_id, str(e)))
self.connection.close()
self.connection.ioloop.start()
def decode(self, body):
try:
_body = body.decode('utf-8')
except AttributeError:
_body = body
return _body
def handle_delivery(self, channel, method, header, body):
try:
start_time = datetime.datetime.now()
_logger.info("Received...")
_logger.info("Content: %s" % body)
req = json.loads(self.decode(body))
# Do something
sleep(randint(10, 100))
time_taken = datetime.datetime.now() - start_time
_logger.info("[{}] Time Taken: {}.{}".format(
req.get("to_num"), time_taken.seconds, time_taken.microseconds))
except Exception as err:
_logger.exception(err)
if __name__ == "__main__":
workers = 3
pika_parameters = OrderedDict([('host', '127.0.0.1'), ('port', 5672), ('virtual_host', '/')])
try:
pool = ThreadPoolExecutor(max_workers=workers)
start = 1
for thread_id in range(start, (workers + start)):
pool.submit(QueueConsumer('test_queue', _logger, pika_parameters, thread_id).consume)
except Exception as err:
_logger.exception(err)
我也有一个队列发布器,如下所示:
import uuid
import pika
import logging
import json
from logging import StreamHandler
from pika import SelectConnection
logging.basicConfig(handlers=[StreamHandler()],
level=logging.DEBUG,
format=logging.BASIC_FORMAT)
_logger = logging.getLogger(__name__)
class QueuePublisherClient(object):
def __init__(self, queue, request):
self.queue = queue
self.response = None
self.channel = None
self.request = request
self.corrId = str(uuid.uuid4())
self.callBackQueue = None
self.connection = None
parameters = pika.ConnectionParameters(host="0.0.0.0")
self.connection = SelectConnection(
parameters, self.on_response_connected
)
self.connection.ioloop.start()
def on_response(self, ch, method, props, body):
if self.corrId == props.correlation_id:
self.response = body
self.connection.close()
self.connection.ioloop.start()
def on_response_connected(self, connection):
_logger.info("Connected...\t(%s)" % self.queue)
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_connected(self, connection):
self.connection = connection
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
# _logger.info("Channel Opened...\t(%s)" % self.queue)
self.channel = channel
self.channel.queue_declare(queue=self.queue,
durable=True,
exclusive=False,
auto_delete=False,
callback=self.on_queue_declared)
def on_queue_declared(self, frame):
self.channel.basic_publish(exchange="",
routing_key=self.queue,
properties=pika.BasicProperties(),
body=str(self.request))
self.connection.close()
_logger.info("Message Published...\t(%s)" % self.queue)
if __name__ == "__main__":
data = {
'text': 'This is a sample text',
'to_num': '+2547xxxxxxxx'
}
count = 10000
for index in range(count):
data['index'] = index
QueuePublisherClient("test_queue", json.dumps(data))
当我向队列发布10000条消息时,消费者没有启动,通过rabbitmqctl list_queues
,我可以看到test_queue有10000条消息。当我启动消费者时,我运行rabbitmqctl list_queues
,我看到队列有0条消息。但是,使用者仍在使用队列中的消息。问题是,当我停止消费者几秒钟后,然后重新启动它,我无法恢复我的消息。我怎么能逃避呢?
这只是对实际情况的模拟,在这种情况下,消费者进程被monit重新启动,而我遭受了消息丢失。
2条答案
按热度按时间ogsagwnx1#
首先,您应该使用最新版本的Pika。
当您设置
no_ack=True
(Pika 1.0中的auto_ack=True
)时,RabbitMQ会在消息传递时考虑消息已确认。这意味着当你停止它时,你的消费者在内存中(或在TCP堆栈中)的每条消息都会丢失,因为RabbitMQ认为它已被确认。您应该使用
no_ack=False
(默认值),并在工作完成后在handle_delivery
中确认消息。请注意,如果你的工作需要很长时间,你应该在另一个线程中完成,以防止阻塞Pika的I/O循环。请参阅以下文档:https://www.rabbitmq.com/confirms.html
tvz2xvvm2#
因为你已经声明了prefetch_count为1,并且你的队列是持久的,所以当消费者启动时,它只会一个接一个地处理消息。要检查同样的情况,您可以在代码中放置1秒的睡眠,并尝试在几秒钟后重新启动消费者。您将看到,已处理的消息仅从队列中删除。如果没有设置预取计数器,则只有一个使用者启动队列中的所有消息被清除。希望有帮助。