在 Celery 任务 success/failure 上可靠地发送邮件

Send mail on Celery task success/failure reliably

我目前正在使用 Flask 和 Celery 开发生物信息学 Web 服务的模板。该模板显示了用户如何在 Celery worker 上创建任务和 运行 这些任务。

此类服务的一个常见要求是在任务成功或失败时通知用户。我正在使用 Flask-Mail 发送邮件。我的第一次尝试是这样的:

@celery.task(name='app.expensive_greet', bind=True)
def expensive_greet(self, person, total, email):
    this_task_id = expensive_greet.request.id

    try:
        for i in range(total):
            time.sleep(0.1)
            self.update_state(state='PROGRESS',
                              meta={'current': i, 'total': total})

        if email:
            send_success_mail(email, this_task_id)
        return 'Greetings, {}!'.format(person)
    except SoftTimeLimitExceeded:
        if email:
            send_failure_mail(email, this_task_id)
        return 'Greetings, fallback!'
    except Exception:
        if email:
            send_failure_mail(email, this_task_id)

如您所见,有很多重复代码。我想知道是否可以在自定义 Celery Task 中隔离邮件处理并最终得到:

class MailBase(celery.Task):
    abstract = True
    def on_success(self, res, task_id, args, kwargs):
        _, _, email = args
        if email:
            send_success_mail(email, task_id)

    def on_failure(self, exc, task_id, args, kwargs, einf):
        _, _, email = args
        if email:
            send_failure_mail(email, task_id)

并为任务设置 base=MailBase。我不喜欢这个解决方案,因为:

  1. 它仍然有重复的 if,但我可以接受。
  2. 如果用户更改任务函数的参数,他们还必须在两个不同的地方修改 MailBase class。
  3. 函数参数的解包很丑。
  4. 由于这是一个主要面向非技术用户的模板,因此简单很重要。

有更好的方法吗?理想情况下,我希望邮件是与任务一起发送的元数据,而不是任务函数的参数,并且能够在不触及任务本身的情况下插入邮件功能。

提前致谢!

如果你只想在任务失败时得到通知,你可以使用 celery built-in email mechanism

另外如果你还想一意孤行,可以尝试用decorator来包装email相关的操作。

import functools


def send_emails(func):
    @functools.wraps(func)
    def wrapper(self, *args, **kwargs):
        this_task_id = self.request.id
        email = kwargs.pop('email', False)  # get email and remove it from kwargs
        try:
            ret = func(self, *args, **kwargs)
        except NotifyException as ex:
            if email:
                send_failure_mail(email, this_task_id)
            return ex.value
        except Exception:
            if email:
                send_failure_mail(email, this_task_id)
            # It would be better to raise again to allow celery knows the task has failed
            raise
        else:
            if email:
                send_success_mail(mail, this_task_id)
            return ret
    return wrapper

NotifyException 是针对错误发生的情况引入的,但用户并不认为它是失败的,只是想发送一封电子邮件。

class NotifyException(Exception):
    '''
    This exception would be handled by send_emails decorator, the decorator will
    catch it and return its value to outer.
    '''
    def __init__(self, value):
        self.value = value
        super(NotifyException, self).__init__(value)

请注意,最好在配置文件中保留 email 参数。

而且现在任务方式会改成这样

@celery.task(name='app.expensive_greet', bind=True)
@send_emails
def expensive_greet(self, person, total):
    try:
        for i in range(total):
            time.sleep(0.1)
            self.update_state(state='PROGRESS',
                              meta={'current': i, 'total': total})
        return 'Greetings, {}!'.format(person)
    except SoftTimeLimitExceeded:
        # Note that raise this exception to allow the decorator catch it
        # and return the value of the exception
        raise NotifyException('Greetings, fallback!')

希望对您有所帮助!