如何根据传递给 Celery 任务事件的参数找到任务 ID?

How do I find task ids for Celery task events based on the arguments passed to them?

我被一些我认为很简单的东西困住了。 我正在尝试确定是否可以在给定一组已知参数的情况下找到所有任务 ID。这对 Celery 4.4 的 API 是否可行,或者我是否应该在 Django ORM 中编写自己的结果接口 table 并自行搜索?

我的用例是一个我认为可能很常见的用例。我有计划为提要和用户配置文件创建 Activity 和通知对象的任务。这些设置为重试直到成功,因为我需要确保创建这些对象。

然而,在异步任务可能落后的高负载下,用户可能会执行类似 post 评论的操作,然后在创建 activity 或通知对象之前将其删除。像这样的情况将需要取消任务或永远重新排队。

对我来说显而易见的解决方案是在从事件 api,但是我无法在 api 文档中清楚地找到允许我执行此操作的界面。我想我的答案可能在 celery.events.state.Task 中,但如果那是查询任务的界面,我不确定 args 是否可搜索。我的 activity 任务从 Django post_save 信号传递了对象 ID 和 create 布尔值。

我可以像 project.activity.create_comment_activity 那样抓取所有相关类型的任务,但是我必须循环每个任务并解压 args 并检查它们,这对于可扩展性来说似乎很糟糕。

我一定是在这里漏掉了一个技巧,或者我只是为结果后端编写了自己的界面并使用 Django ORM 搜索任务。

我仔细研究了 django_celery_results 模块,发现它提供了一个 TaskResult 模型,该模型为结果模型提供了本机 Django ORM 接口。因此,您可以使用 TaskResult.objects.filter

搜索结果

附带说明一下,尽管 Celery 文档涵盖了 integrating the django_celery_results module they do not mention this TaskResult model or it's use. The model is documented in the module docs themselves

示例:

from django_celery_results.models import TaskResult

# Grab all tasks that have not been successful yet that have the 
# PK for the comment object being deleted in their args
results = TaskResult.objects.filter(
    task_args=(instance.pk, True or False),
    task_name='theden_django.activity.tasks.create_comment_activity',
).exclude(
    status='SUCCESS'
)

对于更完整的示例,这是我的 Comment 对象的最终 post_delete 处理程序,它会撤销所有已找到 ID 的任务,以及创建它们的 post_save 任务:


from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity

@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
    """Creates an activity when comments are created"""
    del sender
    del kwargs
    if created:
        create_comment_activity.delay(instance.pk, created)

@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
    """Ensures any pending or running tasks are revoked when a comment is deleted"""
    del sender
    del kwargs

    results = TaskResult.objects.filter(
        task_args=(instance.pk, True or False),
        task_name='theden_django.activity.tasks.create_comment_activity',
    ).exclude(
        status='SUCCESS'
    )

    for result in results:
        celery_app.control.revoke(task_id=result.task_id)

警告

这个实现有一个很大的警告,因为它只会找到至少尝试过一次的任务。我仍然认为事件一定有办法 api。对此的一种缓解措施是使任务取消本身成为延迟任务,这应确保该任务在队列中的所有任务之后 运行。为格外小心,我已将此任务设置为延迟 5 分钟,因此即使工作人员的速度非常慢,任务也应在撤销任务 运行s 之前记录在结果后端中。

handlers.py:

from django.dispatch import receiver
from django.db.models import signals
from django_celery_results.models import TaskResult
from theden_django.core import celery_app
from theden_django.comments.models import Comment
from theden_django.activity.tasks import create_comment_activity

@receiver(signals.post_save, sender=Comment)
def schedule_comment_activity_task(sender, instance, created, **kwargs):
    """Creates an activity when comments are created"""
    del sender
    del kwargs
    if created:
        create_comment_activity.delay(instance.pk, created)


@receiver(signals.post_delete, sender=Comment)
def revoke_comment_activity_task(sender, instance, **kwargs):
    """Ensures any pending or running tasks are revoked when a comment is deleted"""
    del sender
    del kwargs

    revoke_pending_activity_tasks.delay(
        (instance.pk, True or False),
        'theden_django.activity.tasks.create_comment_activity',
        countdown=300
    )

tasks.py:

from __future__ import absolute_import, unicode_literals

import logging
from psycopg2 import OperationalError as psycopg2OperationalError
from django.db import Error
from django.db.models import ObjectDoesNotExist
from django.db.utils import OperationalError
from celery import shared_task
from django_celery_results.models import TaskResult
from theden_django.core import celery_app

LOGGER = logging.getLogger()

@shared_task(bind=True, max_retries=None)
def create_comment_activity(self, comment_pk, created):
    """
    | Async task that creates an activity for a comment.
    :param `integer` comment_pk:
    :param `boolean` created:
    :return:
    """
    try:
        instance = Comment.objects.get(pk=comment_pk)
    except ObjectDoesNotExist as missing_comment_exception:
        LOGGER.error(
            'Unable to find comment %s. Rescheduling task.',
            comment_pk
        )
        self.retry(exc=missing_comment_exception, countdown=60)
    except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
        LOGGER.error(
            'Unable to create comment activity for %s due to database error. Rescheduling task.',
            comment_pk
        )
        self.retry(exc=db_conn_exception, countdown=60)
    else:
        try:
            create_activity(instance, instance.user.pk, created, 'comment')
        except ObjectDoesNotExist as missing_member_exception:
            LOGGER.error(
                'Unable to find actor %s for activity. Rescheduling',
                comment_pk
            )
            self.retry(exc=missing_member_exception, countdown=60)
        except (Error, OperationalError, psycopg2OperationalError) as db_conn_exception:
            LOGGER.error(
                'Unable to create comment activity for %s due to database error. Rescheduling task.',
                comment_pk
            )
            self.retry(exc=db_conn_exception, countdown=60)


@shared_task(bind=True, max_retries=None)
def revoke_pending_activity_tasks(task_args, task_name):
    """
    | Async task that cancels other tasks
    :param task_args:
    :param task_name:
    :return:
    """

    results = TaskResult.objects.filter(
        task_args=task_args,
        task_name=task_name,
    ).exclude(
        status='SUCCESS'
    )

    for result in results:
        celery_app.control.revoke(task_id=result.task_id)