Celery 为每个失败的任务发送邮件
Celery send mail for every failed task
我在我的 Django 应用程序中使用芹菜,我设置了芹菜花来监视芹菜的任务。我有设置任务,当他们 register/submit/FP 等事件时,电子邮件会发送给用户。现在 Flower 向我提供了任务的详细信息及其状态。现在,对于每个失败的任务,我都希望将一封电子邮件发送到我的帐户,这样我就不必每天为失败的任务检查花。 我在 settings.py 文件中做了以下配置
CELERY_SEND_TASK_ERROR_EMAILS = True
和ADMINS
.
EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'noreply@xyz.com'
EMAIL_HOST_PASSWORD = 'xyz123@'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
EMAIL_HOST = 'xyz.abc.com'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
这些是 "From" 电子邮件地址的设置。
几天前,我的一位团队成员不小心更改了上述 email_host 的密码,忘记更新设置文件。 由于 SMTP 身份验证错误,任务失败并不是因为太晚了。
有没有办法解决这个问题,即使发生 SMTP 身份验证错误,我也会立即收到来自 celery 的电子邮件?我不太确定这一点。
是否有任何其他工具可以监控我的任务,并且对于每个失败的任务,它都会向我发送邮件。
您可以配置Datadog https://www.datadoghq.com/
它可以监视和跟踪您的动态基础架构。对于生产服务器上发生的每个错误,您都会收到一封电子邮件。
我发现创建了一个 table 用于维护数据库中的任务。所以我只是简单地创建了一个脚本,它将每小时检查 table 中过去一小时的记录是否有失败的任务,如果发现任何失败的任务,它将发送一封电子邮件。
script.py
#!venv/bin/python2
import os
from django.conf import settings
if __name__ == '__main__' and __package__ is None:
os.sys.path.append(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__))))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "rest_apis.settings")
import django
django.setup()
from django.core.mail import EmailMessage
from djcelery.models import TaskMeta
from datetime import datetime, timedelta, time
USERS_TO_NOTIFY = ['ops@yopmail.com']
TIME_THRESHOLD_INTERVAL = 60
def send_email(email_subject_line, email_body):
email = EmailMessage(email_subject_line,
email_body,
settings.EMAIL_HOST_USER,
USERS_TO_NOTIFY
)
email.send()
def main():
current_time = datetime.now() # Get Current TimeStamp
time_threshold = current_time - timedelta(minutes=TIME_THRESHOLD_INTERVAL) # Get 60 minutes past current time stamp
celery_taskmeta_objects = TaskMeta.objects.filter(status="FAILURE", date_done__gte=time_threshold)
email_body = "Below are the tasks which failed : "
if celery_taskmeta_objects.exists():
for celery_taskmeta in celery_taskmeta_objects:
print celery_taskmeta.task_id
email_body += "\n\ntask_id : %s" % celery_taskmeta.task_id
email_body += "\nstatus : %s" % celery_taskmeta.status
email_body += "\ndate : %s" % celery_taskmeta.date_done
email_body += "\ntraceback :"
email_body += "\n%s\n\n" % celery_taskmeta.traceback
email_subject_line = '[URGENT] Celery task failure in last %s minutes' % (TIME_THRESHOLD_INTERVAL)
send_email(email_subject_line, email_body)
main()
现在在一封电子邮件中,我也得到了完整的堆栈跟踪和任务的 ID。现在我的要求是每小时检查一次,所以我只是将脚本放在 crontab 中。现在您可以根据您的基本需要更改时间阈值并相应地工作。
我在我的 Django 应用程序中使用芹菜,我设置了芹菜花来监视芹菜的任务。我有设置任务,当他们 register/submit/FP 等事件时,电子邮件会发送给用户。现在 Flower 向我提供了任务的详细信息及其状态。现在,对于每个失败的任务,我都希望将一封电子邮件发送到我的帐户,这样我就不必每天为失败的任务检查花。 我在 settings.py 文件中做了以下配置
CELERY_SEND_TASK_ERROR_EMAILS = True
和ADMINS
.
EMAIL_USE_TLS = True
EMAIL_HOST_USER = 'noreply@xyz.com'
EMAIL_HOST_PASSWORD = 'xyz123@'
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
EMAIL_HOST = 'xyz.abc.com'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
这些是 "From" 电子邮件地址的设置。 几天前,我的一位团队成员不小心更改了上述 email_host 的密码,忘记更新设置文件。 由于 SMTP 身份验证错误,任务失败并不是因为太晚了。
有没有办法解决这个问题,即使发生 SMTP 身份验证错误,我也会立即收到来自 celery 的电子邮件?我不太确定这一点。
是否有任何其他工具可以监控我的任务,并且对于每个失败的任务,它都会向我发送邮件。
您可以配置Datadog https://www.datadoghq.com/
它可以监视和跟踪您的动态基础架构。对于生产服务器上发生的每个错误,您都会收到一封电子邮件。
我发现创建了一个 table 用于维护数据库中的任务。所以我只是简单地创建了一个脚本,它将每小时检查 table 中过去一小时的记录是否有失败的任务,如果发现任何失败的任务,它将发送一封电子邮件。
script.py
#!venv/bin/python2
import os
from django.conf import settings
if __name__ == '__main__' and __package__ is None:
os.sys.path.append(
os.path.dirname(
os.path.dirname(
os.path.abspath(__file__))))
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "rest_apis.settings")
import django
django.setup()
from django.core.mail import EmailMessage
from djcelery.models import TaskMeta
from datetime import datetime, timedelta, time
USERS_TO_NOTIFY = ['ops@yopmail.com']
TIME_THRESHOLD_INTERVAL = 60
def send_email(email_subject_line, email_body):
email = EmailMessage(email_subject_line,
email_body,
settings.EMAIL_HOST_USER,
USERS_TO_NOTIFY
)
email.send()
def main():
current_time = datetime.now() # Get Current TimeStamp
time_threshold = current_time - timedelta(minutes=TIME_THRESHOLD_INTERVAL) # Get 60 minutes past current time stamp
celery_taskmeta_objects = TaskMeta.objects.filter(status="FAILURE", date_done__gte=time_threshold)
email_body = "Below are the tasks which failed : "
if celery_taskmeta_objects.exists():
for celery_taskmeta in celery_taskmeta_objects:
print celery_taskmeta.task_id
email_body += "\n\ntask_id : %s" % celery_taskmeta.task_id
email_body += "\nstatus : %s" % celery_taskmeta.status
email_body += "\ndate : %s" % celery_taskmeta.date_done
email_body += "\ntraceback :"
email_body += "\n%s\n\n" % celery_taskmeta.traceback
email_subject_line = '[URGENT] Celery task failure in last %s minutes' % (TIME_THRESHOLD_INTERVAL)
send_email(email_subject_line, email_body)
main()
现在在一封电子邮件中,我也得到了完整的堆栈跟踪和任务的 ID。现在我的要求是每小时检查一次,所以我只是将脚本放在 crontab 中。现在您可以根据您的基本需要更改时间阈值并相应地工作。