将音频文件传递给 Celery 任务

Passing Audio Files To Celery Task

我有一个音乐上传应用程序,我相信将文件传递给 celery 任务来处理上传会很明智。但是,当我尝试传递文件时,正如我将在下面的代码中显示的那样,我收到一条消息,指出它们不是 JSON 可序列化的。处理此操作的正确方法是什么?

.views.py 中 uploaded_songs 以下的所有内容是我当前成功上传音轨的代码。但是,它还没有使用芹菜。

.task.py

from django.contrib.auth import get_user_model
from Beyond_April_Base_Backend.celery import app
from django.contrib.auth.models import User

@app.task
def upload_songs(songs, user_id):
    try:
        user = User.objects.get(pk=user_id)
        print('user and songs')
        print(user)
        print(songs)
    except User.DoesNotExist:
        logging.warning("Tried to find non-exisiting user '%s'" % user_id)

.views.py

class ConcertUploadView(APIView):
    permission_classes = [permissions.IsAuthenticated]

    def post(self, request):
        track_files = request.FILES.getlist('files')
        current_user = self.request.user
        upload_songs.delay(track_files, current_user.pk)
        try:
            selected_band = Band.objects.get(name=request.data['band'])
        except ObjectDoesNotExist:
            print('band not received from form')
            selected_band = Band.objects.get(name='Band')
        venue_name = request.data['venue']
        concert_date_str = request.data['concertDate']
        concert_date_split = concert_date_str.split('(')[0]
        concert_date = datetime.strptime(concert_date_split, '%a %b %d %Y %H:%M:%S %Z%z ')
        concert_city = request.data['city']
        concert_state = request.data['state']
        concert_country = request.data['country']

        new_concert = Concert(
            venue=venue_name,
            date=concert_date,
            city=concert_city,
            state=concert_state,
            country=concert_country,
            band=selected_band,
            user=current_user,
        )
        new_concert.save()
    
        i = 0
        for song in track_files:
            audio_metadata = music_tag.load_file(track_files[i].temporary_file_path())
            temp_path = song.temporary_file_path
            song_title = str(audio_metadata['title'])
            audio_file_instance = Song(
                title=song_title,
                concert=new_concert,
                user=current_user,
                concert_order = i + 1,
                audio_file = track_files[i],
            )
            audio_file_instance.save()
            i += 1
        return Response(status=status.HTTP_201_CREATED)

当您创建 celery 任务时,它会将参数序列化,以便将消息存储在队列后端(RabbitMQ、Redis 等)中。默认的序列化程序是 JSON,二进制文件不是 JSON 可序列化的。有关详细信息,请参阅 celery's serialization docs

您可以 base64 将二进制文件编码为文本,但您不应该这样做:它会增加数据的大小,并且您可能会传递非常大的消息。对于大量大消息,您可能 运行 out of memory/space 在您的后端,这将使检查或记录消息变得困难。

相反,您应该将二进制文件存储在某处,并将引用(文件名、S3 URL、数据库密钥等)传递给任务。然后任务可以加载文件,做它需要做的事情,然后删除原始文件(如果合适的话)。