from pika import BasicProperties, URLParameters
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelWrongStateError, StreamLostError, AMQPConnectionError
from pika.exchange_type import ExchangeType
import json
def consume_messages(queue_name: str):
msgs = list([])
batch_size = 500
q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
q_length = q.method.message_count
if not q_length:
return msgs
msgs_limit = batch_size if q_length > batch_size else q_length
try:
# Get messages and break out
for method_frame, properties, body in channel.consume(queue_name):
# Append the message
try:
msgs.append(json.loads(bytes.decode(body)))
except:
logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")
# Acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop when desired msgs are fetched
if method_frame.delivery_tag == msgs_limit:
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
break
except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
logger.info(f'Connection Interrupted: {str(e)}')
finally:
# Close the channel and the connection
channel.stop_consuming()
channel.close()
return msgs
2条答案
按热度按时间of1yzvn41#
您可以使用basic.get API,它从代理中提取消息,而不是订阅要推送的消息
zi8p0yeb2#
下面是使用pika编写的代码。
下面的代码将使用
channel.consume
来开始使用消息。当达到所需的消息数量时,我们将中断/停止。我已经设置了一个
batch_size
来防止一次拉取大量的消息。你可以随时改变batch_size
来满足你的需要。