我对python中django框架中的celery 任务队列模块有一些奇怪的行为。
当所有变量都正确设置时,我的代码就会工作。但是实现一些独立于celery 的验证代码似乎会干扰celery 。我先解释一下我在编什么程序。我的完整代码在这篇文章的底部。
Jmeter 板用户可以执行通过redis发送给celery 工人的任务。这很管用。接下来,我还成功地实现了celery 进度后端,有了它的progressrecorder,我现在可以设置进度了。一些jquery、ajax、html和css在 Jmeter 板中启用了一个奇特的进度条。
我现在将这个progressrecorder与类中的一些自定义代码组合在一起。这同样有效。通过自定义代码,可以指定哪些python函数定义应该记录进度。我的代码是模块化的,所以函数定义可以在很多级别上重用,并且不是每个函数都应该登录到每个任务中。因此,开发人员应该指定哪些函数应该记录进度。在初始化类时,用户使用“display \u progress \u of \u defs”键执行此操作:
progress_recorder = ProgressRecorder(self)
progress_logging = LogTaskProgressV3(task_id=task_id, app_id=app_id,
progress_recorder=progress_recorder, total_steps=total_steps,
display_progress_of_defs=["function_def_a", "function_def_c"])
在类中,我添加了验证来确认是否设置了“display\u progress\u of \u defs”键。为了测试这一点,我没有设置这个键就运行了代码。任务已执行,但taskresult记录仅在任务完成后插入。从此以后,由于还没有任务的记录,日志记录进度是不可能的。
在完成代码之前,我的问题是:为什么会发生,我如何解决它?
celery 任务:
@shared_task(bind=True)
def go_to_sleep_test(self, duration):
""" Testing out Celery and the custom progress observer-class. """
app_id = 'fictional-app'
total_steps = 3
# Retrieve task id
task_id = current_task.request.id
print(f"Starting 'go_to_sleep_test' with")
print(f" app_id = {app_id}")
print(f" task_id = {task_id}")
print(f" total_steps = {total_steps}")
# Start logging progress
progress_recorder = ProgressRecorder(self)
progress_logging = LogTaskProgressV3(task_id=task_id, app_id=app_id,
progress_recorder=progress_recorder, total_steps=total_steps)
progress_logging.init_function_def(function_def_name="go_to_sleep_test", total_steps=total_steps)
# Initial logging
progress_logging.log_progress(current_step=0,
description="Task started")
################
# PERFORM TASK #
################
sleep(2)
progress_logging.log_progress(current_step=1,
description="Step 1")
sleep(2)
progress_logging.log_progress(current_step=2,
description="Step 2")
sleep(2)
progress_logging.log_progress(current_step=3,
description="Step 3")
sleep(2)
# Finish task
return 'Done'
自定义进度观察者类。这与上述任务在同一个文件中。
class LogTaskProgressV3:
""" Log task progress """
def __init__(self, task_id="not_given", app_id="not_given", progress_recorder=None, total_steps=100,
display_progress_of_defs=[]):
if display_progress_of_defs is None:
display_progress_of_defs = []
self.task_id = task_id
self.app_id = app_id
self.function_def_name = "Run init_function_def first."
self.progress_recorder = progress_recorder
self.total_steps = total_steps
self.display_progress_of_defs = display_progress_of_defs
print(f"self.task_id = {self.task_id}")
print(f"self.app_id = {self.app_id}")
print(f"self.function_def_name = {self.function_def_name}")
print(f"self.progress_recorder = {self.progress_recorder}")
print(f"self.total_steps = {self.total_steps}")
print(f"self.display_progress_of_defs = {self.display_progress_of_defs}")
def init_function_def(self, function_def_name="Function definition name not given.", total_steps=100):
""" Each function definition has it's total amount of steps. Therefore set at start of function definition. """
self.function_def_name = function_def_name
self.total_steps = total_steps
print(f"self.function_def_name = {self.function_def_name}")
print(f"self.total_steps = {self.total_steps}")
def log_progress(self, current_step, description):
print("Now logging progress.")
print(f"Confirming progress logging for '{self.function_def_name}' is requested:")
print(f"self.display_progress_of_defs = {self.display_progress_of_defs}")
# Without the below validation, the code works.
if len(self.display_progress_of_defs) == 0:
print("Progress logging not requested.")
return
elif self.function_def_name not in self.display_progress_of_defs:
print("Progress logging not requested.")
return
print("Progress logging is requested.")
# End of validation.
# Log progress in celery model
self.progress_recorder.set_progress(current=current_step,
total=self.total_steps,
description=description)
暂无答案!
目前还没有任何答案,快来回答吧!