在 Celery 任务 success/failure 上可靠地发送邮件
Send mail on Celery task success/failure reliably
我目前正在使用 Flask 和 Celery 开发生物信息学 Web 服务的模板。该模板显示了用户如何在 Celery worker 上创建任务和 运行 这些任务。
此类服务的一个常见要求是在任务成功或失败时通知用户。我正在使用 Flask-Mail 发送邮件。我的第一次尝试是这样的:
@celery.task(name='app.expensive_greet', bind=True)
def expensive_greet(self, person, total, email):
this_task_id = expensive_greet.request.id
try:
for i in range(total):
time.sleep(0.1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total})
if email:
send_success_mail(email, this_task_id)
return 'Greetings, {}!'.format(person)
except SoftTimeLimitExceeded:
if email:
send_failure_mail(email, this_task_id)
return 'Greetings, fallback!'
except Exception:
if email:
send_failure_mail(email, this_task_id)
如您所见,有很多重复代码。我想知道是否可以在自定义 Celery Task
中隔离邮件处理并最终得到:
class MailBase(celery.Task):
abstract = True
def on_success(self, res, task_id, args, kwargs):
_, _, email = args
if email:
send_success_mail(email, task_id)
def on_failure(self, exc, task_id, args, kwargs, einf):
_, _, email = args
if email:
send_failure_mail(email, task_id)
并为任务设置 base=MailBase
。我不喜欢这个解决方案,因为:
- 它仍然有重复的
if
,但我可以接受。
- 如果用户更改任务函数的参数,他们还必须在两个不同的地方修改
MailBase
class。
- 函数参数的解包很丑。
- 由于这是一个主要面向非技术用户的模板,因此简单很重要。
有更好的方法吗?理想情况下,我希望邮件是与任务一起发送的元数据,而不是任务函数的参数,并且能够在不触及任务本身的情况下插入邮件功能。
提前致谢!
如果你只想在任务失败时得到通知,你可以使用 celery built-in email mechanism。
另外如果你还想一意孤行,可以尝试用decorator来包装email相关的操作。
import functools
def send_emails(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
this_task_id = self.request.id
email = kwargs.pop('email', False) # get email and remove it from kwargs
try:
ret = func(self, *args, **kwargs)
except NotifyException as ex:
if email:
send_failure_mail(email, this_task_id)
return ex.value
except Exception:
if email:
send_failure_mail(email, this_task_id)
# It would be better to raise again to allow celery knows the task has failed
raise
else:
if email:
send_success_mail(mail, this_task_id)
return ret
return wrapper
NotifyException 是针对错误发生的情况引入的,但用户并不认为它是失败的,只是想发送一封电子邮件。
class NotifyException(Exception):
'''
This exception would be handled by send_emails decorator, the decorator will
catch it and return its value to outer.
'''
def __init__(self, value):
self.value = value
super(NotifyException, self).__init__(value)
请注意,最好在配置文件中保留 email
参数。
而且现在任务方式会改成这样
@celery.task(name='app.expensive_greet', bind=True)
@send_emails
def expensive_greet(self, person, total):
try:
for i in range(total):
time.sleep(0.1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total})
return 'Greetings, {}!'.format(person)
except SoftTimeLimitExceeded:
# Note that raise this exception to allow the decorator catch it
# and return the value of the exception
raise NotifyException('Greetings, fallback!')
希望对您有所帮助!
我目前正在使用 Flask 和 Celery 开发生物信息学 Web 服务的模板。该模板显示了用户如何在 Celery worker 上创建任务和 运行 这些任务。
此类服务的一个常见要求是在任务成功或失败时通知用户。我正在使用 Flask-Mail 发送邮件。我的第一次尝试是这样的:
@celery.task(name='app.expensive_greet', bind=True)
def expensive_greet(self, person, total, email):
this_task_id = expensive_greet.request.id
try:
for i in range(total):
time.sleep(0.1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total})
if email:
send_success_mail(email, this_task_id)
return 'Greetings, {}!'.format(person)
except SoftTimeLimitExceeded:
if email:
send_failure_mail(email, this_task_id)
return 'Greetings, fallback!'
except Exception:
if email:
send_failure_mail(email, this_task_id)
如您所见,有很多重复代码。我想知道是否可以在自定义 Celery Task
中隔离邮件处理并最终得到:
class MailBase(celery.Task):
abstract = True
def on_success(self, res, task_id, args, kwargs):
_, _, email = args
if email:
send_success_mail(email, task_id)
def on_failure(self, exc, task_id, args, kwargs, einf):
_, _, email = args
if email:
send_failure_mail(email, task_id)
并为任务设置 base=MailBase
。我不喜欢这个解决方案,因为:
- 它仍然有重复的
if
,但我可以接受。 - 如果用户更改任务函数的参数,他们还必须在两个不同的地方修改
MailBase
class。 - 函数参数的解包很丑。
- 由于这是一个主要面向非技术用户的模板,因此简单很重要。
有更好的方法吗?理想情况下,我希望邮件是与任务一起发送的元数据,而不是任务函数的参数,并且能够在不触及任务本身的情况下插入邮件功能。
提前致谢!
如果你只想在任务失败时得到通知,你可以使用 celery built-in email mechanism。
另外如果你还想一意孤行,可以尝试用decorator来包装email相关的操作。
import functools
def send_emails(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
this_task_id = self.request.id
email = kwargs.pop('email', False) # get email and remove it from kwargs
try:
ret = func(self, *args, **kwargs)
except NotifyException as ex:
if email:
send_failure_mail(email, this_task_id)
return ex.value
except Exception:
if email:
send_failure_mail(email, this_task_id)
# It would be better to raise again to allow celery knows the task has failed
raise
else:
if email:
send_success_mail(mail, this_task_id)
return ret
return wrapper
NotifyException 是针对错误发生的情况引入的,但用户并不认为它是失败的,只是想发送一封电子邮件。
class NotifyException(Exception):
'''
This exception would be handled by send_emails decorator, the decorator will
catch it and return its value to outer.
'''
def __init__(self, value):
self.value = value
super(NotifyException, self).__init__(value)
请注意,最好在配置文件中保留 email
参数。
而且现在任务方式会改成这样
@celery.task(name='app.expensive_greet', bind=True)
@send_emails
def expensive_greet(self, person, total):
try:
for i in range(total):
time.sleep(0.1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total})
return 'Greetings, {}!'.format(person)
except SoftTimeLimitExceeded:
# Note that raise this exception to allow the decorator catch it
# and return the value of the exception
raise NotifyException('Greetings, fallback!')
希望对您有所帮助!