在 Django 中使用 RabbitMQ 和 Celery post_save

Using RabbitMQ with Celery in Django post_save

我正在我的 Django 项目上使用 Celery 应用异步任务处理。

我的项目逻辑:,

如果有多个 运行,我想异步执行此操作。请记住,用户可以从前端点击上传不止一行。

我正在设置 RabbitMQ 作为我的经纪人。我已安装 rabbitMQ 并 运行ning。我也在 settings.py 中设置了 CELERY_BROKER_URL='amqp://localhost'。我对配置中下一步应该做什么有点迷茫,我能得到一些指导吗?我想我需要为我的任务配置 celery worker。

以下是我目前的代码:

views.py #保存到数据库的视图

class RunsUploadView(APIView):
    serializer_class = RunsURLSerializer

    def post(self, request,  *args, **kwargs):
        crawler_name = self.request.data.get('crawler')
        run_id = self.kwargs.get("run_id")
        run_url = self.request.data.get("run_url")

        run = Run()
        run.name = f"{crawler_name}_{run_id}"
        run.run = run_id
        run.url = run_url
        run.save()

        return Response(model_to_dict(run))

models.py # 运行 保存到 table 运行 然后触发 post_save 信号。

from django.db import models


class Run(models.Model):
    UPLOAD_STATUS = (
        ("Pending", "pending"),
        ("Running", "running"),
        ("Success", "success"),
        ("Failed", "failed"),
    )
    name = models.CharField(max_length=100)
    run = models.CharField(max_length=100, unique=True)
    url = models.URLField(max_length=1000)
    status = models.CharField(
        max_length=50, choices=UPLOAD_STATUS, default="Pending")
    started_at = models.DateTimeField(null=True)
    done_at = models.DateTimeField(null=True)

signals.py #在 save()

之后处理 post_save 逻辑
from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import DownloadRun


@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
    pending_runs = Run.objects.filter(status='Pending') #all pending runs collected, I would need to handle the runs asynchronously. 
    for run in pending_runs:
        run.status = "Started"
        run.save()
        DownloadRun(run)

Tasks.py #using a class 因为我要更新更多功能。

class DownloadRun:
    def __init__(self, run):
        run_object = model_to_dict(run)
        self.run_url = run_object["url"]

        self.download_run()


    def download_run(self, dest_folder="runs"):
        ""Run file is downloaded from url""

我找到了前进的方向。我没配置好celery

tasks.py

from celery import shared_task #import shared_task decorator from celery
class DownloadRun:
    def __init__(self, run):
        run_object = model_to_dict(run)
        self.run_url = run_object["url"]

        self.download_run()


    def download_run(self, dest_folder="runs"):
        ""Run file is downloaded from url""

@shared_task 
def celery_task(run_id):
    DownloadRun(run_id)

signals.py

from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import celery_task #import celery_task from tasks.py


@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
    pending_runs = Run.objects.filter(status='Pending')
    for run in pending_runs:
        run.status = "Started"
        run.save()
        celery_task.delay(run.run) #call celery delay() to invoke the task (pass the unique key as parameter, could be id, in my case I chose the run)