如何将 TemporaryUploadedFile 传递给 celery 任务?

How can I pass TemporaryUploadedFile to celery task?

我有一个代码:

def post(self, request, *args, **kwargs):
    file = request.FILES["import_file"]
    # create a tast with celery and save ID of the task
    task_id = importing.delay(file).id
    return Response({"task_id": task_id}, content_type="application/json")

当 type(file) 为 TemporaryUploadedFile 时出现错误,因为文件无法写入 redis。

我可以取这个临时文件的名字并将这个名字保存到 Redis 中。然后 celery worker 可以从 redis 中获取这个名称并读取文件。但我不确定:能否在 celery worker 从 redis 获取名称之前删除文件?

一旦 request_finished 信号被触发,TemporaryUploadedFile 就会关闭并删除。当您的 Celery 工作人员访问该文件时,该文件很可能不再存在。

您应该将文件复制到持久位置,并在完成后让 Celery 清理文件。

    def close(self):
        try:
            return self.file.close()
        except OSError as e:
            if e.errno != errno.ENOENT:
                # Means the file was moved or deleted before the tempfile
                # could unlink it.  Still sets self.file.close_called and
                # calls self.file.file.close() before the exception
                raise

根据TemporaryUploadedFile的close方法的源代码,如果临时文件被移动,它不会被关闭,所以你可以移动它并将它的新路径传递给celery任务,然后在celery时自己删除它任务完成。 这样,您将节省将文件复制到持久位置的时间和资源。

    import os
    from django.core.files import uploadedfile

    file = request.FILES["import_file"]
    new_path = '/tmp/import_file'
    if isinstance(file, uploadedfile.TemporaryUploadedFile):
        os.rename(file.file.name, new_path)
    else:    # Deal with InMemoryUploadedFile
        with open(new_path, 'wb') as f:
            for chunk in file.chunks():
                f.write(chunk)
    task_id = importing.delay(new_path).id