从cloud pubsub确认/处理我的所有消息都需要超过600秒的时间限制,因此,在我处理消息时,我需要最多计算590秒,并请求将确认截止时间再延长600秒(这样其他订户就不会获得重新提交)。我的计划是编写一个 Package 器,等待处理运行并
while awaited processing_function not finished:
update the deadline
我用于处理的基本代码:
subscriber = pubsub_v1.SubscriberClient()
subscription_path = "projects/path/to/subscription"
def callback(message):
print(f'Received message: {message}')
print(f'data: {message.data}')
if message.attributes:
# PROCESS THAT'S GOING TO TAKE A LONG TIME
# ...
message.ack()
# subscribe method provides an asynchronous interface for processing its callback
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
with subscriber: # wrap subscriber in a 'with' block to automatically call close() when done
try:
streaming_pull_future.result() # going without a timeout will wait & block indefinitely
except TimeoutError:
streaming_pull_future.cancel() # trigger the shutdown
streaming_pull_future.result()
我如何实施计划,不断更新截止日期,直到回调运行完毕?我在asyncio方面不是很有经验。我甚至可以有一个 Package 器函数来等待回调,并在等待时作为后台进程运行while循环吗?
1条答案
按热度按时间nom7f22z1#
官方客户端源代码说“默认实现为您处理此问题;您不需要手动设置确认截止日期。”