完成任务后插入celery taskresult

hgc7kmma  于 2021-06-10  发布在  Redis
关注(0)|答案(0)|浏览(285)

我对python中django框架中的celery 任务队列模块有一些奇怪的行为。
当所有变量都正确设置时,我的代码就会工作。但是实现一些独立于celery 的验证代码似乎会干扰celery 。我先解释一下我在编什么程序。我的完整代码在这篇文章的底部。
Jmeter 板用户可以执行通过redis发送给celery 工人的任务。这很管用。接下来,我还成功地实现了celery 进度后端,有了它的progressrecorder,我现在可以设置进度了。一些jquery、ajax、html和css在 Jmeter 板中启用了一个奇特的进度条。
我现在将这个progressrecorder与类中的一些自定义代码组合在一起。这同样有效。通过自定义代码,可以指定哪些python函数定义应该记录进度。我的代码是模块化的,所以函数定义可以在很多级别上重用,并且不是每个函数都应该登录到每个任务中。因此,开发人员应该指定哪些函数应该记录进度。在初始化类时,用户使用“display \u progress \u of \u defs”键执行此操作:

progress_recorder = ProgressRecorder(self)
progress_logging = LogTaskProgressV3(task_id=task_id, app_id=app_id,
                                     progress_recorder=progress_recorder, total_steps=total_steps,
                                     display_progress_of_defs=["function_def_a", "function_def_c"])

在类中,我添加了验证来确认是否设置了“display\u progress\u of \u defs”键。为了测试这一点,我没有设置这个键就运行了代码。任务已执行,但taskresult记录仅在任务完成后插入。从此以后,由于还没有任务的记录,日志记录进度是不可能的。
在完成代码之前,我的问题是:为什么会发生,我如何解决它?
celery 任务:

@shared_task(bind=True)
def go_to_sleep_test(self, duration):
    """ Testing out Celery and the custom progress observer-class. """

    app_id = 'fictional-app'
    total_steps = 3
    # Retrieve task id
    task_id = current_task.request.id

    print(f"Starting 'go_to_sleep_test' with")
    print(f"  app_id        = {app_id}")
    print(f"  task_id       = {task_id}")
    print(f"  total_steps   = {total_steps}")

    # Start logging progress
    progress_recorder = ProgressRecorder(self)
    progress_logging = LogTaskProgressV3(task_id=task_id, app_id=app_id,
                                         progress_recorder=progress_recorder, total_steps=total_steps)
    progress_logging.init_function_def(function_def_name="go_to_sleep_test", total_steps=total_steps)

    # Initial logging
    progress_logging.log_progress(current_step=0,
                                  description="Task started")

    ################
    # PERFORM TASK #
    ################
    sleep(2)
    progress_logging.log_progress(current_step=1,
                                  description="Step 1")
    sleep(2)
    progress_logging.log_progress(current_step=2,
                                  description="Step 2")
    sleep(2)
    progress_logging.log_progress(current_step=3,
                                  description="Step 3")
    sleep(2)

    # Finish task
    return 'Done'

自定义进度观察者类。这与上述任务在同一个文件中。

class LogTaskProgressV3:
    """ Log task progress """

    def __init__(self, task_id="not_given", app_id="not_given", progress_recorder=None, total_steps=100,
                 display_progress_of_defs=[]):

        if display_progress_of_defs is None:
            display_progress_of_defs = []

        self.task_id = task_id
        self.app_id = app_id
        self.function_def_name = "Run init_function_def first."
        self.progress_recorder = progress_recorder
        self.total_steps = total_steps
        self.display_progress_of_defs = display_progress_of_defs

        print(f"self.task_id = {self.task_id}")
        print(f"self.app_id = {self.app_id}")
        print(f"self.function_def_name = {self.function_def_name}")
        print(f"self.progress_recorder = {self.progress_recorder}")
        print(f"self.total_steps = {self.total_steps}")
        print(f"self.display_progress_of_defs = {self.display_progress_of_defs}")

    def init_function_def(self, function_def_name="Function definition name not given.", total_steps=100):
        """ Each function definition has it's total amount of steps. Therefore set at start of function definition. """
        self.function_def_name = function_def_name
        self.total_steps = total_steps
        print(f"self.function_def_name = {self.function_def_name}")
        print(f"self.total_steps = {self.total_steps}")

    def log_progress(self, current_step, description):
        print("Now logging progress.")
        print(f"Confirming progress logging for '{self.function_def_name}' is requested:")
        print(f"self.display_progress_of_defs = {self.display_progress_of_defs}")

        # Without the below validation, the code works. 
        if len(self.display_progress_of_defs) == 0:
            print("Progress logging not requested.")
            return
        elif self.function_def_name not in self.display_progress_of_defs:
            print("Progress logging not requested.")
            return
        print("Progress logging is requested.")
        # End of validation. 

        # Log progress in celery model
        self.progress_recorder.set_progress(current=current_step,
                                            total=self.total_steps,
                                            description=description)

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题