经过多次测试和搜索,我没有得到结果,我希望你能指导我。
我的代码可在this GitHub地址。
由于主要代码的复杂性,我写了一个简单的代码示例,并将其链接到上述地址。
我有一个包含四个app.tasks的worker,它们的名称如下:
- app_1000
- app_1002
- app_1004
- app_1006
并且每个app.tasks应该只同时执行一次,也就是说,比如app_1000
不应该同时执行两三次,而应该每次只执行一次,如果app_1000
的当前任务完成了,就可以转到下一个作业。
Celery配置:
broker_url='amqp://guest@localhost//'
result_backend='rpc://'
include=['celery_app.tasks']
worker_prefetch_multiplier = 1
task_routes={
'celery_app.tasks.app_1000':{'queue':'q_app'},
'celery_app.tasks.app_1002':{'queue':'q_app'},
'celery_app.tasks.app_1004':{'queue':'q_app'},
'celery_app.tasks.app_1006':{'queue':'q_app'},
'celery_app.tasks.app_timeout':{'queue':'q_timeout'},
}
正如你所看到的,worker_prefetch_multiplier = 1
是上面的配置。
我使用fastapi发送请求,示例请求如下(为了简化问题,我只通过fastapi发送这个worker必须执行的任务数)
我还使用花脚本来检查任务。
在Postman中按下Send按钮后,所有这20个假设任务都被发送到Worker,起初一切都很好,因为每个app.tasks都启动了一个任务。
但几分钟后,当事情向前发展时,app.tasks同时执行,也就是说,例如,根据照片,app_1000
已经启动了两次,或者在下一张照片中,app_1006
已经启动了两次,它们同时运行,我不打算这样做。案件发生。
片刻后:
我希望app_1000或app_1006一次只做一件事,但我不知道如何做到这一点。
重要提示:请不要建议为4个应用程序创建4个队列。任务,因为在我的真实的项目中,我有100多个应用程序。任务,管理所有这些队列是非常困难的。
可能会出现一个问题,例如,为什么app_1000不应该同时执行?这个问题的答案非常复杂,我们必须解释太多的主要代码,所以请跳过这个问题。
代码是in GitHub(代码的体积很小,不会占用你太多的时间),如果你想运行它,你可以输入以下命令:
celery -A celery_app worker -Q q_app --loglevel=INFO --concurrency=4 -n worker@%h
celery flower --port=5566
uvivorn api:app --reload
谢谢你
2条答案
按热度按时间yuvru6vn1#
不幸的是,celery没有提供任何开箱即用的解决方案。您必须实现分布式缓存锁定机制,并在执行任务之前进行检查。类似的问题和相关的答案是here。
qq24tv8q2#
你可以尝试在任务中使用锁,比如:
其中
cache_lock
是: