django 重写Celery任务类以在任务之间共享变量

yiytaume  于 2023-04-07  发布在  Go
关注(0)|答案(2)|浏览(219)

我试图实现一个任务,其中全局变量在两个不同的Celery任务之间共享。为此,我继承了任务类并使用了属性。根据celery文档,当调用新任务时,基类将初始化。我们有一种方法可以在任务之间重用对象吗?我们可以覆盖运行吗()方法从任务?如果我们覆盖运行方法。我们如何注册任务?与celery 使用celery 5.X?尝试序列化对象..任何替代方法将不胜感激。

class handler(Task):

    def __init__(self):
        self.base_obj = ""

    @property
    def global_handler(self):
        return self.global_thread_handler

    @property
    def base_handler(self):
        return self.base_obj

    @app.task(base=handler)
    def test123():
        test123.base_handler = cls1()

    @app.task(base=handler)
    def test456():
        test456.base_handler.method()
6g8kf2rb

6g8kf2rb1#

Celery使用多个进程来异步运行任务。无论您使用什么Task类,它都只能在worker中可见。如果您想在进程之间共享变量,则必须使用multiprocessing特定的对象才能在单个进程之外共享值。:

from celery import Celery, Task
from multiprocessing import Queue

BROKER_URL = 'sqla+sqlite:///celery.db'

app = Celery(
    'tasks',
    broker='sqla+sqlite:///celery.db',
    backend='db+sqlite:///celery_results.db'
)

q = Queue()

@app.task
def set_var(val: str):
    # put is appending new value to Queue every time when we call set_var
    q.put(val)
    print(f'Global variable has been changed to {val}')

@app.task
def get_var():
    if not q.empty():
        # get is popping first value from Queue if Queue is not empty
        val = q.get()
        print(f'Global variable is {val}')
        return val
    else:
        print('Global variable is EMPTY')

如果我们运行:

>>> from tasks import set_var, get_var
>>> get_var.delay()
<AsyncResult: 387d02a2-a993-4ea4-9dc6-37b4a6657f92>
>>> set_var.delay('NEW VALUE')
<AsyncResult: 8b02519f-cd87-4e88-9270-2c621f84b14e>
>>> get_var.delay()
<AsyncResult: 828047b1-5f3a-4980-b922-224d0cfb628d>

日志如下所示:

[2023-03-25 20:38:29,670: INFO/MainProcess] Task tasks.get_var[387d02a2-a993-4ea4-9dc6-37b4a6657f92] received
[2023-03-25 20:38:29,674: WARNING/ForkPoolWorker-1] Global variable is EMPTY
[2023-03-25 20:38:29,723: INFO/ForkPoolWorker-1] Task tasks.get_var[387d02a2-a993-4ea4-9dc6-37b4a6657f92] succeeded in 0.049737373999960255s: None
[2023-03-25 20:38:45,801: INFO/MainProcess] Task tasks.set_var[8b02519f-cd87-4e88-9270-2c621f84b14e] received
[2023-03-25 20:38:45,802: WARNING/ForkPoolWorker-2] Global variable has been changed to NEW VALUE
[2023-03-25 20:38:45,822: INFO/ForkPoolWorker-2] Task tasks.set_var[8b02519f-cd87-4e88-9270-2c621f84b14e] succeeded in 0.02061020500059385s: None
[2023-03-25 20:38:49,846: INFO/MainProcess] Task tasks.get_var[828047b1-5f3a-4980-b922-224d0cfb628d] received
[2023-03-25 20:38:49,848: WARNING/ForkPoolWorker-3] Global variable is NEW VALUE
[2023-03-25 20:38:49,870: INFO/ForkPoolWorker-3] Task tasks.get_var[828047b1-5f3a-4980-b922-224d0cfb628d] succeeded in 0.022652251000181423s: 'NEW VALUE'

可以看到,ForkPoolWorker-3能够读取在ForkPoolWorker-2中设置的值
由于锁和并发问题,你必须小心使用共享变量。

fhg3lkii

fhg3lkii2#

在Celery中注册一个任务可以简单地使用如下方法完成:

# my_app/tasks.py
import celery
from my_app.celery import app

class MyTask(celery.Task):
    def run(self):
        [...]

MyTask = app.register_task(MyTask())

我认为你不可能在任务中重用对象。有人能纠正我吗?

相关问题