python 如何在celery 中获取任务状态

egdjgwm8  于 2023-02-11  发布在  Python
关注(0)|答案(1)|浏览(471)

我有一个celery 任务,我想使用任务ID了解任务的状态。我已经阅读了前面的答案,但无法使其工作。我使用了命令

celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8pe

command result

www.example.comcelery.py

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "coutoEditor.settings")
app = Celery("coutoEditor")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

www.example.comsettings.py

CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"

www.example.comtasks.py

@shared_task()
def speed_up_vid_task(input_path, speed_factor, start, end):

    '''
    Method Params:
    input_path: The video file url or directory path file name
    speed_factor: The factor for the speed up process
    start: start of the video part that needs to be sped up (in secs)
    end: end of the video part that needs to be sped up (in secs)
    '''

    start = convert_to_sec(start)
    end = convert_to_sec(end)

    filename = str(uuid.uuid4())
    print(filename, "new")
    temporary_dir = BASE_DIR + '/' + editor_speedUp_temp_dir  # editor_temp_dir = media/editor/speed_vid/temp/"
    output_dir = BASE_DIR + '/' + editor_speedUp_output_dir  # editor_speedUp_output_dir = media/editor/speed_vid/

    # Check for broken url
    r = requests.get(input_path, stream=True)
    if not r.status_code == 200:
        return Response({
            'message': "media file is corrupted",
            'data': "broken url process could not be completed",
            'status': False
        }, status=status.HTTP_400_BAD_REQUEST)

    if not os.path.exists(output_dir):
        os.mkdir(output_dir)

    if not os.path.exists(temporary_dir):
        os.mkdir(temporary_dir)

    stream = os.popen(
        "ffmpeg.ffprobe -loglevel error -select_streams a -show_entries stream=codec_type -of csv=p=0 '{}'".format(
            input_path))
    output = stream.read()
    if len(output) == 0:
        input_path_vid = os.path.join(BASE_DIR, temporary_dir) + filename + "_temp_video.mp4"
        cmd = "ffmpeg -f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 -i '{}' -c:v copy -c:a aac -shortest {}".format(
            input_path, input_path_vid)
        os.system(cmd)
    else:
        # check if it's a directory or a url
        if(os.path.isfile(input_path)):
            input_path_vid = BASE_DIR + input_path
            pass
        else:
            ext_name = filename + '_temp_video.mp4'
            ext_path = temporary_dir + ext_name
            r = requests.get(input_path)
            with open(ext_path, 'wb') as outfile:
                outfile.write(r.content)
            outfile.close()
            input_path_vid = ext_path

    output_path = os.path.join(BASE_DIR, editor_speedUp_output_dir + filename + ".mp4")

    cmd = 'ffmpeg -i ' + input_path_vid + ' \
            -filter_complex \
            "[0:v]trim=0:' + str(start) + ',setpts=PTS-STARTPTS[v1]; \
            [0:v]trim=' + str(start) + ':' + str(end) + ',setpts=1/' + str(speed_factor) + '*(PTS-STARTPTS)[v2]; \
            [0:v]trim=' + str(end) + ',setpts=PTS-STARTPTS[v3]; \
            [0:a]atrim=0:' + str(start) + ',asetpts=PTS-STARTPTS[a1]; \
            [0:a]atrim=' + str(start) + ':' + str(end) + ',asetpts=PTS-STARTPTS,atempo=' + str(speed_factor) + '[a2]; \
            [0:a]atrim=' + str(end) + ',asetpts=PTS-STARTPTS[a3]; \
            [v1][a1][v2][a2][v3][a3]concat=n=3:v=1:a=1" \
            -preset superfast -profile:v baseline ' + output_path

    os.system(cmd)

    generated_video = open(output_path, "rb")
    generated_video_file = TemporaryFiles.objects.create(temp_file=File(generated_video, name=filename + ".mp4"),
                                                         created_at=datetime.utcnow())
    generated_video.close()

    if os.path.exists(input_path_vid):
        os.remove(input_path_vid)

    if os.path.exists(output_path):
        os.remove(output_path)

    res_dict = {}
    res_dict["video_url"] = os.path.join(BASE_URL, generated_video_file.temp_file.url[1:])

    return res_dict

www.example.comviews.py

class speed_up_video(APIView):
    def post(self,request):
        video_url = request.data["video_url"]
        speed_factor = request.data["speed_factor"]
        start = request.data["start"]
        end = request.data["end"]
        result_vid = speed_up_vid_task.delay(video_url, speed_factor, start, end)
        return Response(result_vid.get())```

我试着在django上实现了celery ,经过一些测试,它工作了,并在celery 终端显示了输出。我想使用任务ID从终端获得任务的状态。Redis和Celery服务器运行良好。我正在学习实现Celery,我被困在获得这一部分上。我正在使用Redis作为代理和后端数据库。我看过一个关于从redis获取任务状态的博客,但是无法实现。如果有其他方法可以查询,请回答。
当我在任务装饰器中设置bind = True时,我的django应用抛出参数错误。
谢谢。

r1wp621o

r1wp621o1#

Django有django-celery-resultsTaskResult模型来保存任务的结果。你可以查询检查状态。
但是它是在celery 中运行的,你必须等待任务队列,运行完毕才能得到数据。一种方法是循环等待。就像这样:

all_tasks = []
task = speed_up_vid_task.delay(video_url, speed_factor, start, end)
all_tasks.append(task.task_id)
while len(all_tasks) > 0:
    time.sleep(1)
    for task in TaskResult.objects.filter(task_id__in=all_tasks):
        if task.status in ['SUCCESS', 'FAILURE']:
            # do anything you want in this
            all_tasks.remove(task.task_id)

相关问题