我正在Celery上运行一个周期性任务,每3分钟执行一次相同的代码。如果条件为True,则执行一个操作(发送一条消息),但我只需要发送一次消息。
理想的情况是,如果发送了消息,在接下来的24小时内不能发送(即使函数会每3分钟执行一次),在这24小时之后,条件会被再次检查,如果仍然为True,消息会被再次发送。我如何用Python来实现这一点?我在这里放了一些代码:
if object.shussui:
client.conversation_start({
'channelId': 'x',
'to': 'user',
'type': 'text',
'content': {
'text': 'body message')
}
})
这里正在检查条件,如果shussui
是True
,则client.start_conversation
发送消息。
2条答案
按热度按时间o8x7eapl1#
您可以保留最后一次发送邮件的时间,并检查是否已经过去24小时:
ttygqcqt2#
除了像Mureinik's answer中那样添加对上次执行时间的检查之外,如果您有多个工作线程,您应该获取上次执行时间并将其存储在一个中央共享位置,例如,无论您使用的是哪一个消息代理,或者存储到一个数据库。
last_sent
的密钥还应包括确定性签名,以匹配唯一消息和/或用户ID;或者任何你需要的条件。值是一个datetime对象或者一个int/float类型的秒数(对于time.time()
)如果不将其存储在一个中央/共享位置,那么当另一个worker/示例获得同一消息的下一个任务时,它将立即被重新发送,因为之前的
last_sent
将只存储在之前worker的内存中,而当前worker将没有更新的值。如何存储
last_sent
信息的示例:last_sent:user:1234:msg:9876
,其值可以是datetime.datetime
或time.time
。这里9876
是消息的哈希值,类似于hash('hello')
。不要存储消息本身。[适用于多个工作示例]这可以通过一个原子操作来实现,这样可以避免竞态条件。使用
set
,它的有效期为24小时,因为消息需要在24小时后发送,或者如果它不在Redis中,这可以用来存储任意字符串,并将它在Redis中的存在作为一个标志:if object.shussui and status is not None
可以简化为if object.shussui and status
。user_id
、msg_hash
、sent_at
的表last_sent
。[适用于多个工作示例]此外,根据您的需要和情况,您需要决定如何处理争用条件。例如,同一消息在一行中触发两次,并且在第一次更新“last_sent”之前触发。