在 Django 中使用 RabbitMQ 和 Celery post_save
Using RabbitMQ with Celery in Django post_save
我正在我的 Django 项目上使用 Celery
应用异步任务处理。
我的项目逻辑:,
- 从前端看,有一个 table,每行都有一个
upload button
。用户点击它,一个 payload
被发送到后端,其中包含一个 url,其中包含一个文件。
- 在 Django 视图中收到文件。并存入数据库,table
Run
。立即保存对象 post_save signal
触发到 运行 芹菜任务。
- 要执行的任务是,获取具有特定状态的 运行 列表。对于每个 运行,执行下载文件的任务。
如果有多个 运行,我想异步执行此操作。请记住,用户可以从前端点击上传不止一行。
我正在设置 RabbitMQ
作为我的经纪人。我已安装 rabbitMQ
并 运行ning。我也在 settings.py 中设置了 CELERY_BROKER_URL='amqp://localhost'
。我对配置中下一步应该做什么有点迷茫,我能得到一些指导吗?我想我需要为我的任务配置 celery worker。
以下是我目前的代码:
views.py #保存到数据库的视图
class RunsUploadView(APIView):
serializer_class = RunsURLSerializer
def post(self, request, *args, **kwargs):
crawler_name = self.request.data.get('crawler')
run_id = self.kwargs.get("run_id")
run_url = self.request.data.get("run_url")
run = Run()
run.name = f"{crawler_name}_{run_id}"
run.run = run_id
run.url = run_url
run.save()
return Response(model_to_dict(run))
models.py # 运行 保存到 table 运行 然后触发 post_save 信号。
from django.db import models
class Run(models.Model):
UPLOAD_STATUS = (
("Pending", "pending"),
("Running", "running"),
("Success", "success"),
("Failed", "failed"),
)
name = models.CharField(max_length=100)
run = models.CharField(max_length=100, unique=True)
url = models.URLField(max_length=1000)
status = models.CharField(
max_length=50, choices=UPLOAD_STATUS, default="Pending")
started_at = models.DateTimeField(null=True)
done_at = models.DateTimeField(null=True)
signals.py #在 save()
之后处理 post_save 逻辑
from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import DownloadRun
@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
pending_runs = Run.objects.filter(status='Pending') #all pending runs collected, I would need to handle the runs asynchronously.
for run in pending_runs:
run.status = "Started"
run.save()
DownloadRun(run)
Tasks.py #using a class 因为我要更新更多功能。
class DownloadRun:
def __init__(self, run):
run_object = model_to_dict(run)
self.run_url = run_object["url"]
self.download_run()
def download_run(self, dest_folder="runs"):
""Run file is downloaded from url""
我找到了前进的方向。我没配置好celery
tasks.py
from celery import shared_task #import shared_task decorator from celery
class DownloadRun:
def __init__(self, run):
run_object = model_to_dict(run)
self.run_url = run_object["url"]
self.download_run()
def download_run(self, dest_folder="runs"):
""Run file is downloaded from url""
@shared_task
def celery_task(run_id):
DownloadRun(run_id)
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import celery_task #import celery_task from tasks.py
@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
pending_runs = Run.objects.filter(status='Pending')
for run in pending_runs:
run.status = "Started"
run.save()
celery_task.delay(run.run) #call celery delay() to invoke the task (pass the unique key as parameter, could be id, in my case I chose the run)
我正在我的 Django 项目上使用 Celery
应用异步任务处理。
我的项目逻辑:,
- 从前端看,有一个 table,每行都有一个
upload button
。用户点击它,一个payload
被发送到后端,其中包含一个 url,其中包含一个文件。 - 在 Django 视图中收到文件。并存入数据库,table
Run
。立即保存对象post_save signal
触发到 运行 芹菜任务。 - 要执行的任务是,获取具有特定状态的 运行 列表。对于每个 运行,执行下载文件的任务。
如果有多个 运行,我想异步执行此操作。请记住,用户可以从前端点击上传不止一行。
我正在设置 RabbitMQ
作为我的经纪人。我已安装 rabbitMQ
并 运行ning。我也在 settings.py 中设置了 CELERY_BROKER_URL='amqp://localhost'
。我对配置中下一步应该做什么有点迷茫,我能得到一些指导吗?我想我需要为我的任务配置 celery worker。
以下是我目前的代码:
views.py #保存到数据库的视图
class RunsUploadView(APIView):
serializer_class = RunsURLSerializer
def post(self, request, *args, **kwargs):
crawler_name = self.request.data.get('crawler')
run_id = self.kwargs.get("run_id")
run_url = self.request.data.get("run_url")
run = Run()
run.name = f"{crawler_name}_{run_id}"
run.run = run_id
run.url = run_url
run.save()
return Response(model_to_dict(run))
models.py # 运行 保存到 table 运行 然后触发 post_save 信号。
from django.db import models
class Run(models.Model):
UPLOAD_STATUS = (
("Pending", "pending"),
("Running", "running"),
("Success", "success"),
("Failed", "failed"),
)
name = models.CharField(max_length=100)
run = models.CharField(max_length=100, unique=True)
url = models.URLField(max_length=1000)
status = models.CharField(
max_length=50, choices=UPLOAD_STATUS, default="Pending")
started_at = models.DateTimeField(null=True)
done_at = models.DateTimeField(null=True)
signals.py #在 save()
之后处理 post_save 逻辑from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import DownloadRun
@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
pending_runs = Run.objects.filter(status='Pending') #all pending runs collected, I would need to handle the runs asynchronously.
for run in pending_runs:
run.status = "Started"
run.save()
DownloadRun(run)
Tasks.py #using a class 因为我要更新更多功能。
class DownloadRun:
def __init__(self, run):
run_object = model_to_dict(run)
self.run_url = run_object["url"]
self.download_run()
def download_run(self, dest_folder="runs"):
""Run file is downloaded from url""
我找到了前进的方向。我没配置好celery
tasks.py
from celery import shared_task #import shared_task decorator from celery
class DownloadRun:
def __init__(self, run):
run_object = model_to_dict(run)
self.run_url = run_object["url"]
self.download_run()
def download_run(self, dest_folder="runs"):
""Run file is downloaded from url""
@shared_task
def celery_task(run_id):
DownloadRun(run_id)
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import celery_task #import celery_task from tasks.py
@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
pending_runs = Run.objects.filter(status='Pending')
for run in pending_runs:
run.status = "Started"
run.save()
celery_task.delay(run.run) #call celery delay() to invoke the task (pass the unique key as parameter, could be id, in my case I chose the run)