将音频文件传递给 Celery 任务
Passing Audio Files To Celery Task
我有一个音乐上传应用程序,我相信将文件传递给 celery 任务来处理上传会很明智。但是,当我尝试传递文件时,正如我将在下面的代码中显示的那样,我收到一条消息,指出它们不是 JSON 可序列化的。处理此操作的正确方法是什么?
.views.py 中 uploaded_songs 以下的所有内容是我当前成功上传音轨的代码。但是,它还没有使用芹菜。
.task.py
from django.contrib.auth import get_user_model
from Beyond_April_Base_Backend.celery import app
from django.contrib.auth.models import User
@app.task
def upload_songs(songs, user_id):
try:
user = User.objects.get(pk=user_id)
print('user and songs')
print(user)
print(songs)
except User.DoesNotExist:
logging.warning("Tried to find non-exisiting user '%s'" % user_id)
.views.py
class ConcertUploadView(APIView):
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
track_files = request.FILES.getlist('files')
current_user = self.request.user
upload_songs.delay(track_files, current_user.pk)
try:
selected_band = Band.objects.get(name=request.data['band'])
except ObjectDoesNotExist:
print('band not received from form')
selected_band = Band.objects.get(name='Band')
venue_name = request.data['venue']
concert_date_str = request.data['concertDate']
concert_date_split = concert_date_str.split('(')[0]
concert_date = datetime.strptime(concert_date_split, '%a %b %d %Y %H:%M:%S %Z%z ')
concert_city = request.data['city']
concert_state = request.data['state']
concert_country = request.data['country']
new_concert = Concert(
venue=venue_name,
date=concert_date,
city=concert_city,
state=concert_state,
country=concert_country,
band=selected_band,
user=current_user,
)
new_concert.save()
i = 0
for song in track_files:
audio_metadata = music_tag.load_file(track_files[i].temporary_file_path())
temp_path = song.temporary_file_path
song_title = str(audio_metadata['title'])
audio_file_instance = Song(
title=song_title,
concert=new_concert,
user=current_user,
concert_order = i + 1,
audio_file = track_files[i],
)
audio_file_instance.save()
i += 1
return Response(status=status.HTTP_201_CREATED)
当您创建 celery 任务时,它会将参数序列化,以便将消息存储在队列后端(RabbitMQ、Redis 等)中。默认的序列化程序是 JSON,二进制文件不是 JSON 可序列化的。有关详细信息,请参阅 celery's serialization docs。
您可以 base64 将二进制文件编码为文本,但您不应该这样做:它会增加数据的大小,并且您可能会传递非常大的消息。对于大量大消息,您可能 运行 out of memory/space 在您的后端,这将使检查或记录消息变得困难。
相反,您应该将二进制文件存储在某处,并将引用(文件名、S3 URL、数据库密钥等)传递给任务。然后任务可以加载文件,做它需要做的事情,然后删除原始文件(如果合适的话)。
我有一个音乐上传应用程序,我相信将文件传递给 celery 任务来处理上传会很明智。但是,当我尝试传递文件时,正如我将在下面的代码中显示的那样,我收到一条消息,指出它们不是 JSON 可序列化的。处理此操作的正确方法是什么?
.views.py 中 uploaded_songs 以下的所有内容是我当前成功上传音轨的代码。但是,它还没有使用芹菜。
.task.py
from django.contrib.auth import get_user_model
from Beyond_April_Base_Backend.celery import app
from django.contrib.auth.models import User
@app.task
def upload_songs(songs, user_id):
try:
user = User.objects.get(pk=user_id)
print('user and songs')
print(user)
print(songs)
except User.DoesNotExist:
logging.warning("Tried to find non-exisiting user '%s'" % user_id)
.views.py
class ConcertUploadView(APIView):
permission_classes = [permissions.IsAuthenticated]
def post(self, request):
track_files = request.FILES.getlist('files')
current_user = self.request.user
upload_songs.delay(track_files, current_user.pk)
try:
selected_band = Band.objects.get(name=request.data['band'])
except ObjectDoesNotExist:
print('band not received from form')
selected_band = Band.objects.get(name='Band')
venue_name = request.data['venue']
concert_date_str = request.data['concertDate']
concert_date_split = concert_date_str.split('(')[0]
concert_date = datetime.strptime(concert_date_split, '%a %b %d %Y %H:%M:%S %Z%z ')
concert_city = request.data['city']
concert_state = request.data['state']
concert_country = request.data['country']
new_concert = Concert(
venue=venue_name,
date=concert_date,
city=concert_city,
state=concert_state,
country=concert_country,
band=selected_band,
user=current_user,
)
new_concert.save()
i = 0
for song in track_files:
audio_metadata = music_tag.load_file(track_files[i].temporary_file_path())
temp_path = song.temporary_file_path
song_title = str(audio_metadata['title'])
audio_file_instance = Song(
title=song_title,
concert=new_concert,
user=current_user,
concert_order = i + 1,
audio_file = track_files[i],
)
audio_file_instance.save()
i += 1
return Response(status=status.HTTP_201_CREATED)
当您创建 celery 任务时,它会将参数序列化,以便将消息存储在队列后端(RabbitMQ、Redis 等)中。默认的序列化程序是 JSON,二进制文件不是 JSON 可序列化的。有关详细信息,请参阅 celery's serialization docs。
您可以 base64 将二进制文件编码为文本,但您不应该这样做:它会增加数据的大小,并且您可能会传递非常大的消息。对于大量大消息,您可能 运行 out of memory/space 在您的后端,这将使检查或记录消息变得困难。
相反,您应该将二进制文件存储在某处,并将引用(文件名、S3 URL、数据库密钥等)传递给任务。然后任务可以加载文件,做它需要做的事情,然后删除原始文件(如果合适的话)。