通过 Celery 在 Django 中异步执行任务的正确方法

The right way to perform a task asynchronously in Django, via Celery

我有一个模仿 Instagram 的 Django 应用程序,用户可以上传照片或表情包,然后他们的粉丝会收到上述照片的通知。

目前,为了发送通知,只要用户上传照片,我就会遍历上传者拥有的每个粉丝,将通知附加到列表中,然后 bulk_create 对象。如:

    fans = UserFan.objects.filter(star=user).values_list('fan',flat=True)
    fan_list = []
    for fan in fans:
        fan_list.append(PhotoObjectSubscription(viewer_id=fan, which_photo=photo, updated_at=time, seen=False, type_of_object='1'))
    PhotoObjectSubscription.objects.bulk_create(fan_list)

简单的东西。请注意,我还在我的应用程序的 VM 上安装了 supervisord,其中我通过 celery 运行 一些基本任务(使用 redis 作为消息代理)。

现在我想作为 celery task 执行上面的 bulk_create 任务;异步地。我的 bulk_create 代码位于用于处理照片上传的同一视图中,因此我认为异步执行此操作会为用户加快该过程。

我是 celery 任务的新手,有人可以通过说明性示例指出我如何将上述 bulk_create 任务变成 celery 任务吗?我已经完成研究,以下是我认为我需要做的事情:

1)在bulk_create语句末尾添加delay()

PhotoObjectSubscription.objects.bulk_create(fan_list).delay()

2) 在tasks.py中添加一个新的任务来处理上面的内容:

@task
def bulk_create_notifications():
    PhotoObjectSubscription.objects.bulk_create(fan_list)

3) settings.py中的CELERYBEAT_SCHEDULE无需添加任何内容,因为该任务不是周期性任务。

我可能不完全正确,所以请帮忙。

您需要传递 fans(如果它实际上是 ValuesListQuerySet,您可能必须将其转换为 list)以及任务需要的所有其他内容(例如 photo.id) 作为任务的参数:

@task
def bulk_create_notifications(fans, photo_id):
    fan_list = []
    for fan in fans:
        fan_list.append(PhotoObjectSubscription(viewer_id=fan, which_photo_id=photo_id, updated_at=time, seen=False, type_of_object='1'))
    PhotoObjectSubscription.objects.bulk_create(fan_list)

然后,您可以通过以下方式异步启动任务:

# call delay on the task and pass it the same params you would pass to the fnc itself
bulk_create_notifications.delay(fans)  

由于参数需要由您的任务队列 (redis) 存储和通信,您只能传递可由您在设置中设置的序列化程序序列化的参数(可能 JSON)。这意味着您应该坚持使用简单类型 s.a。字符串、整数,您不能将模型实例或其列表作为参数传递。

您当然可以从更高的起点开始,只需通过 user.id 并完成任务中的所有数据库工作。