我有一个celery 任务,我想使用任务ID了解任务的状态。我已经阅读了前面的答案,但无法使其工作。我使用了命令
celery -A proj inspect query_task e9f6c8f0-fec9-4ae8-a8c6-cf8c8451d4f8pe
www.example.comcelery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "coutoEditor.settings")
app = Celery("coutoEditor")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
www.example.comsettings.py
CELERY_BROKER_URL = "redis://localhost:6379"
CELERY_RESULT_BACKEND = "redis://localhost:6379"
www.example.comtasks.py
@shared_task()
def speed_up_vid_task(input_path, speed_factor, start, end):
'''
Method Params:
input_path: The video file url or directory path file name
speed_factor: The factor for the speed up process
start: start of the video part that needs to be sped up (in secs)
end: end of the video part that needs to be sped up (in secs)
'''
start = convert_to_sec(start)
end = convert_to_sec(end)
filename = str(uuid.uuid4())
print(filename, "new")
temporary_dir = BASE_DIR + '/' + editor_speedUp_temp_dir # editor_temp_dir = media/editor/speed_vid/temp/"
output_dir = BASE_DIR + '/' + editor_speedUp_output_dir # editor_speedUp_output_dir = media/editor/speed_vid/
# Check for broken url
r = requests.get(input_path, stream=True)
if not r.status_code == 200:
return Response({
'message': "media file is corrupted",
'data': "broken url process could not be completed",
'status': False
}, status=status.HTTP_400_BAD_REQUEST)
if not os.path.exists(output_dir):
os.mkdir(output_dir)
if not os.path.exists(temporary_dir):
os.mkdir(temporary_dir)
stream = os.popen(
"ffmpeg.ffprobe -loglevel error -select_streams a -show_entries stream=codec_type -of csv=p=0 '{}'".format(
input_path))
output = stream.read()
if len(output) == 0:
input_path_vid = os.path.join(BASE_DIR, temporary_dir) + filename + "_temp_video.mp4"
cmd = "ffmpeg -f lavfi -i anullsrc=channel_layout=stereo:sample_rate=44100 -i '{}' -c:v copy -c:a aac -shortest {}".format(
input_path, input_path_vid)
os.system(cmd)
else:
# check if it's a directory or a url
if(os.path.isfile(input_path)):
input_path_vid = BASE_DIR + input_path
pass
else:
ext_name = filename + '_temp_video.mp4'
ext_path = temporary_dir + ext_name
r = requests.get(input_path)
with open(ext_path, 'wb') as outfile:
outfile.write(r.content)
outfile.close()
input_path_vid = ext_path
output_path = os.path.join(BASE_DIR, editor_speedUp_output_dir + filename + ".mp4")
cmd = 'ffmpeg -i ' + input_path_vid + ' \
-filter_complex \
"[0:v]trim=0:' + str(start) + ',setpts=PTS-STARTPTS[v1]; \
[0:v]trim=' + str(start) + ':' + str(end) + ',setpts=1/' + str(speed_factor) + '*(PTS-STARTPTS)[v2]; \
[0:v]trim=' + str(end) + ',setpts=PTS-STARTPTS[v3]; \
[0:a]atrim=0:' + str(start) + ',asetpts=PTS-STARTPTS[a1]; \
[0:a]atrim=' + str(start) + ':' + str(end) + ',asetpts=PTS-STARTPTS,atempo=' + str(speed_factor) + '[a2]; \
[0:a]atrim=' + str(end) + ',asetpts=PTS-STARTPTS[a3]; \
[v1][a1][v2][a2][v3][a3]concat=n=3:v=1:a=1" \
-preset superfast -profile:v baseline ' + output_path
os.system(cmd)
generated_video = open(output_path, "rb")
generated_video_file = TemporaryFiles.objects.create(temp_file=File(generated_video, name=filename + ".mp4"),
created_at=datetime.utcnow())
generated_video.close()
if os.path.exists(input_path_vid):
os.remove(input_path_vid)
if os.path.exists(output_path):
os.remove(output_path)
res_dict = {}
res_dict["video_url"] = os.path.join(BASE_URL, generated_video_file.temp_file.url[1:])
return res_dict
www.example.comviews.py
class speed_up_video(APIView):
def post(self,request):
video_url = request.data["video_url"]
speed_factor = request.data["speed_factor"]
start = request.data["start"]
end = request.data["end"]
result_vid = speed_up_vid_task.delay(video_url, speed_factor, start, end)
return Response(result_vid.get())```
我试着在django上实现了celery ,经过一些测试,它工作了,并在celery 终端显示了输出。我想使用任务ID从终端获得任务的状态。Redis和Celery服务器运行良好。我正在学习实现Celery,我被困在获得这一部分上。我正在使用Redis作为代理和后端数据库。我看过一个关于从redis获取任务状态的博客,但是无法实现。如果有其他方法可以查询,请回答。
当我在任务装饰器中设置bind = True时,我的django应用抛出参数错误。
谢谢。
1条答案
按热度按时间r1wp621o1#
Django有django-celery-results有
TaskResult
模型来保存任务的结果。你可以查询检查状态。但是它是在celery 中运行的,你必须等待任务队列,运行完毕才能得到数据。一种方法是循环等待。就像这样: