在Django自定义FileField的保存方法中卸载大量计算

c3frrgcw  于 2022-11-19  发布在  Go
关注(0)|答案(1)|浏览(116)

我正在基于Django(4.1.1)和Vue构建一个图库web应用。我还想上传和显示视频(不仅仅是图片)。对于支持的格式,在视频html标签中不起作用,我正在通过pyffmpeg将这些格式转换为mp4
为此,我基于FileField为我的模型创建了一个自定义字段。在它的save方法中,我获取文件内容,转换它并保存结果。这是由序列化程序通过相应的ViewSet调用的。这是可行的,但视频转换花费的时间太长,以至于来自我的Vue应用程序(使用axios执行)的webrequest运行到超时。
很明显,我需要以某种方式卸载转换,直接返回相应的响应,并在转换完成后立即将数据保存在数据库中。
这是可能的吗?或者我需要写一个自定义的视图,而不是ViewSet来进行计算?你能给予我一个提示,告诉我如何卸载计算吗?我对asyncio之类的东西只有初步的了解。

**TL;DR:**如何在将文件数据保存到FileField模型之前异步执行大量计算,并在计算结束前返回响应?

如果需要,我可以提供我当前的代码。

q0qdq0h2

q0qdq0h21#

我现在已经解决了我的问题,虽然我仍然对其他/更好的解决方案感兴趣。我的解决方案工作,但可能不是最好的,我觉得它是一个有点hacky在一些地方。

**TL;DR:**安装了django-q作为任务队列管理器和一个redis数据库后端,将其连接到django,然后通过

taskid = async_task("apps.myapp.services.transcode_video", data)

这应该是一个健壮的系统,以并行地处理这些代码转换任务,而不会阻塞请求。
我找到了this tutorial about Django-Q。Django-Q管理和执行来自Django的任务。它与Django并行运行,并通过它的代理(在本例中是一个redis数据库)连接到Django。
首先我通过pip安装了django-q和redis客户端模块

pip install django-q redis

然后我建立了一个Redis数据库(在我的机器上的一个docker容器中运行,带有官方的redis图像)。
然后将Django配置为使用Django-Q,方法是将配置添加到settings.py(注意,我禁用了超时,因为转码任务可能会花费相当长的时间。以后可能会更改):

Q_CLUSTER = {
    'name': 'django_q_django',
    'workers': 8,
    'recycle': 500,
    'timeout': None,
    'compress': True,
    'save_limit': 250,
    'queue_limit': 500,
    'cpu_affinity': 1,
    'label': 'Django Q',
    'redis': {
        'host': 'redishostname',
        'port': 6379,
        'password': 'mysecureredisdbpassword',
        'db': 0, }
}

然后通过将Django-Q添加到www.example.com中已安装的应用程序来激活Django-Qsettings.py:

INSTALLED_APPS = [
    ...
    'django_q',
]

然后通过以下方式迁移Django Q的模型定义:

python manage.py migrate

并通过以下方式启动Django Q(Redis数据库应该会在此时运行):

python manage.py qcluster

这是在一个独立的终端运行的典型

python manage.py runserver

注意:当然这两个只是为了开发。我目前还不知道如何在生产中部署Django Q。
现在我们需要一个函数文件。在教程中,我将文件services.py添加到我的应用程序中。在那里,我简单地定义了要运行的函数:

def transcode_video(data):
    # Doing my transcoding stuff here
    return {'entryid': entry.id, 'filename': target_name}

然后可以通过以下方式在视图代码中调用此函数:

taskid = async_task("apps.myapp.services.transcode_video", data)

因此,我可以向函数提供数据,并获得一个任务ID作为返回值。已执行函数的返回值将出现在所创建任务的result字段中,这样,您甚至可以从那里返回数据。
我在那个阶段遇到了一个问题:data包含一个TemporaryUploadedFile对象,这导致了pickle错误。数据在传递给Django Q之前似乎被pickle了,这对那个对象不起作用。可能有一种方法可以将文件转换为picklable格式,尽管我已经需要文件系统上的文件来调用pyffmeg,在视图中,我只是将数据写入一个文件(以块为单位,以避免一次将整个文件加载到内存中

with open(filepath, 'wb') as f:
    for chunk in self.request.data['file'].chunks():
        f.write(chunk)

通常在ViewSet中,我会在最后调用serializer.save(),但对于转码,我不会这样做,因为新对象在事务之后会保存在Django Q函数中。(UploadedFile来自dango.core.files.uploadedfileAlbumEntry是我自己的模型,我想为它创建一个示例)

with open(target_path, 'rb') as f:
    file = UploadedFile(
        file=f,
        name=target_name,
        content_type=data['file_type']+"/"+data['target_ext'],
    )
    entry = AlbumEntry(
        file=file,
        ... other Model fields here)
    entry.save()

为了从视图集中返回一个已定义的响应,即使对象还没有创建,除了perform_create()方法(我在那里做了所有的处理)之外,我还必须覆盖create()方法。为此,我从父类中复制了代码,并对其进行了轻微的修改,以根据perform_create()的返回值返回一个特定的响应(以前没有返回任何东西):

def create(self, request, *args, **kwargs):
    serializer = self.get_serializer(data=request.data)
    serializer.is_valid(raise_exception=True)
    taskid = self.perform_create(serializer)
    if taskid:
        return HttpResponse(json.dumps({'taskid': taskid, 'status': 'transcoding'}), status=status.HTTP_201_CREATED)
    headers = self.get_success_headers(serializer.data)
    return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)

因此,perform_create()将返回转码作业的任务ID,否则返回None
最后但并非最不重要的一个问题是前端不知道代码转换是什么时候完成的。所以我构建了一个简单的视图来通过ID获取任务:

@api_view(['GET'])
@authentication_classes([authentication.SessionAuthentication])
@permission_classes([permissions.IsAuthenticated])
def get_task(request, task_id):
    task = Task.get_task(task_id)
    if not task:
        return HttpResponse(json.dumps({
            'success': False
        }))
    return HttpResponse(json.dumps({
        'id': task.id,
        'result': task.result,
        ...some more data to return}))

您可以看到,当没有找到任务时,我返回了一个固定的响应。这是我的解决方案,因为默认情况下,只有当任务完成时才会创建Task对象。对于我的目的,可以假设它仍然在运行。this github issue of Django Q中的一个注解建议,要获得最新的Task对象,您需要编写自己的Task模型,并以某种方式实现它。它会定期检查Django Q的任务状态。我不想这么做。
我还把结果放在响应中,这样我的前端就可以定期轮询任务(通过任务ID),当转码完成时,它将包含数据库中创建的Model对象的ID。当我的前端看到这个ID时,它将加载对象的内容。

相关问题