Django中如何使用Celery上传文件

How to use Celery to upload files in Django

我想知道如何使用 Celery worker 来处理文件上传。 所以我尝试在一个简单的 class 上实现它。 我在我的 ModelViewSet 中覆盖了 create class。 但显然 Django 的默认 json 编码器不会序列化 ImageFields (Lame)。 如果你们能告诉我如何解决这个问题,我将非常感激。 这是我想出的:

serializers.py:

class ProductImageSerializer(serializers.ModelSerializer):
    class Meta:
        model = ProductImage
        fields = ['id', 'image']

tasks.py:

from time import sleep
from celery import shared_task
from .models import ProductImage

@shared_task:
def upload_image(product_id, image):
    print('Uploading image...')
    sleep(10)
    product = ProductImage(product_id=product_id, image=image)
    product.save()

views.py:

class ProductImageViewSet(ModelViewSet):
    serializer_class = ProductImageSerializer

    def get_queryset(self):
        return ProductImage.objects.filter(product_id=self.kwargs['product_pk'])

    def create(self, request, *args, **kwargs):
        product_id = self.kwargs['product_pk']
        image = self.request.FILES['image']
        image.open()
        image_data = Image.open(image)
        upload_image.delay(product_id, image_data)

        return Response('Thanks')

这是包含我的 ImageField 的我的模型:

class ProductImage(models.Model):
    product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name='images')
    image = models.ImageField(upload_to='store/images', validators=[validate_image_size])

所以我想出了一个办法来做到这一点。 这是我的解决方案:

问题是 celery 的默认 json 编码器无法序列化图像、InMemoryUploadedFile、ModelObjects 和... 所以我们需要给它传递一个 json 可序列化的值。 在这种情况下,我们想要序列化一个图像。 所以我们可以做的是将我们的图像转换为字节,然后将该字节对象转换为字符串,这样我们就可以将它发送到我们的 celery 任务。 在我们的任务中收到字符串后,我们可以将其转换回图像并使用互联网上 celery.Many 人建议的此解决方案上传,但其中 none 人提供了任何代码。 所以这里是上面例子的代码,如果你想看到它的实际效果:

在我的 views.py 中,我使用了 ModelViewSet 并覆盖了创建方法:

def create(self, request, *args, **kwargs):

        image = self.request.FILES['image'].read()

        byte = base64.b64encode(image)
        
        data = {
            'product_id': self.kwargs['product_pk'],
            'image': byte.decode('utf-8'),
            "name": self.request.FILES['image'].name
        }

        upload_image.delay(data=data)

        return Response('Uploading...')

这是我的 tasks.py:

from time import sleep
from celery import shared_task
from .models import ProductImage
import PIL.Image as Image
import io
import base64
import os
from django.core.files import File

@shared_task
def upload_image(data):
    
    print('Uploading image...')
    
    sleep(10)
    
    product_id = data['product_id']

    byte_data = data['image'].encode(encoding='utf-8')
    b = base64.b64decode(byte_data)
    img = Image.open(io.BytesIO(b))
    img.save(data['name'], format=img.format)
    
    with open(data['name'], 'rb') as file:
        picture = File(file)

        instance = ProductImage(product_id=product_id, image=picture)
        instance.save()
    
    os.remove(data['name'])

    print('Uploaded!')

我希望有人觉得这有帮助。 任何人有任何建议请在评论中告诉我。 祝你有美好的一天;)

大家好,我之前针对这个问题发布了一个解决方案,尽管该解决方案工作正常,但我找到了一个更好的解决方案。 使用 base64 编码和解码二进制文件会使它们变大,这不是我们想要的。所以更好的解决方案是将上传的文件暂时保存在磁盘上,将路径传递给我们的celery worker上传并在我们的数据库中创建一个ProductImage实例,然后删除我们保存在磁盘上的文件。

实现方法如下:

tasks.py:

from time import sleep
from celery import shared_task
from .models import ProductImage
from django.core.files import File
from django.core.files.storage import FileSystemStorage
from pathlib import Path

@shared_task
def upload(product_id, path, file_name):

    print('Uploading image...')

    sleep(10)
    
    storage = FileSystemStorage()

    path_object = Path(path)

    with path_object.open(mode='rb') as file:
        
        picture = File(file, name=path_object.name)

        instance = ProductImage(product_id=product_id, image=picture)

        instance.save()


    storage.delete(file_name)

    print('Uploaded!')

在 serializers.py 中,您应该像这样覆盖 ProductImage 序列化程序的创建方法:

    def create(self, validated_data):
        product_id = self.context['product_id']
        image_file = self.context['image_file']
        storage = FileSystemStorage()
        
        image_file.name = storage.get_available_name(image_file)

        storage.save(image_file.name, File(image_file))

        return upload.delay(product_id=product_id, path=storage.path(image_file.name), file_name=image_file.name)

您还应该重写 ProductImage 的 ViewSet 中的创建方法,为您的序列化程序的上下文提供图像文件:

    def create(self, request, *args, **kwargs):
        product_id = self.kwargs['product_pk']
        image_file = self.request.FILES['image']
        serializer = ProductImageSerializer(
            data=request.data,
            context={
                'product_id': product_id,
                'image_file': image_file
            }
        )
        serializer.is_valid(raise_exception=True)
        serializer.save()
        return Response('Upload Started...')