rabbitmq 如何在Celery中不同时启动app.tasks?

vsnjm48y  于 2023-10-20  发布在  RabbitMQ
关注(0)|答案(2)|浏览(111)

经过多次测试和搜索,我没有得到结果,我希望你能指导我。
我的代码可在this GitHub地址。
由于主要代码的复杂性,我写了一个简单的代码示例,并将其链接到上述地址。
我有一个包含四个app.tasks的worker,它们的名称如下:

  • app_1000
  • app_1002
  • app_1004
  • app_1006

并且每个app.tasks应该只同时执行一次,也就是说,比如app_1000不应该同时执行两三次,而应该每次只执行一次,如果app_1000的当前任务完成了,就可以转到下一个作业。
Celery配置:

broker_url='amqp://guest@localhost//'
result_backend='rpc://'
include=['celery_app.tasks']
worker_prefetch_multiplier = 1

task_routes={
    'celery_app.tasks.app_1000':{'queue':'q_app'},
    'celery_app.tasks.app_1002':{'queue':'q_app'},
    'celery_app.tasks.app_1004':{'queue':'q_app'},
    'celery_app.tasks.app_1006':{'queue':'q_app'},
    'celery_app.tasks.app_timeout':{'queue':'q_timeout'},
}

正如你所看到的,worker_prefetch_multiplier = 1是上面的配置。
我使用fastapi发送请求,示例请求如下(为了简化问题,我只通过fastapi发送这个worker必须执行的任务数)

我还使用花脚本来检查任务。
在Postman中按下Send按钮后,所有这20个假设任务都被发送到Worker,起初一切都很好,因为每个app.tasks都启动了一个任务。

但几分钟后,当事情向前发展时,app.tasks同时执行,也就是说,例如,根据照片,app_1000已经启动了两次,或者在下一张照片中,app_1006已经启动了两次,它们同时运行,我不打算这样做。案件发生。

片刻后:

我希望app_1000或app_1006一次只做一件事,但我不知道如何做到这一点。
重要提示:请不要建议为4个应用程序创建4个队列。任务,因为在我的真实的项目中,我有100多个应用程序。任务,管理所有这些队列是非常困难的。

可能会出现一个问题,例如,为什么app_1000不应该同时执行?这个问题的答案非常复杂,我们必须解释太多的主要代码,所以请跳过这个问题。
代码是in GitHub(代码的体积很小,不会占用你太多的时间),如果你想运行它,你可以输入以下命令:

celery -A celery_app  worker -Q q_app --loglevel=INFO --concurrency=4 -n  worker@%h 
celery flower --port=5566
uvivorn api:app --reload

谢谢你

yuvru6vn

yuvru6vn1#

不幸的是,celery没有提供任何开箱即用的解决方案。您必须实现分布式缓存锁定机制,并在执行任务之前进行检查。类似的问题和相关的答案是here

qq24tv8q

qq24tv8q2#

你可以尝试在任务中使用锁,比如:

@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def app_1000(self, arg):
    # Using lock to ensure there's no other task (you can include args on lock_id)
    lock_id = 'lock-1000'
    with cache_lock(lock_id, self.app.oid) as acquired:

        # Skip if there's another task
        if not acquired:
            logger.debug("Skip task: There's another task doing the same.")
            return False

        # your task here ...

其中cache_lock是:

from django.core.cache import cache

@contextmanager
def cache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

相关问题