我正在集成RabbitMQ和GCP Pub/Sub。我正在使用Pika
库与Python。连接由BlockingConnection
建立。
我的消费者作为单独的线程运行。最大预期消息工作负载高达100 msg/s。
根据我执行的一些初步测试,似乎解决方案是工作,但我有一些怀疑回调函数是如何构造的。我调用publish
方法发布到GCP中的Pub/Sub,然后在块中尝试,basic_ack
处理消息。
如果有人在这方面有经验,我可以要求给予一些意见,我的解决方案,并可能是一些例子,它可以如何实施。
def consume_data_callback(self, basic_deliver, body):
# ... some code
future = self.publisher.publish(topic_path, self.payload.SerializeToString())
try:
message_id = future.result(timeout=1)
self.channel.basic_ack(basic_deliver.delivery_tag)
except Exception as e:
future.cancel()
_logger.error("Result after publishing Pub/Sub with: {}".format(e))
字符串
谢谢你的回答。
1条答案
按热度按时间yb3bgrhw1#
我个人还没有尝试过,但我确实搜索了可能对你有帮助的参考资料。这篇RabbitMQ-发布/订阅文章有一个回调代码示例,可以帮助你确认你所写的是否是一个合适的解决方案。
GitHub完整代码版本:
字符串