使用 Django 默认电子邮件后端使用 Celery 发送电子邮件
Sending Email With Celery Using Django Default Email Backend
我正在尝试使用 Django 和 Celery 向我的注册用户发送确认电子邮件。我正在使用 RabbitMQ 作为代理。每当我执行代码时,芹菜日志显示它正在接收任务并成功执行,但我没有收到任何电子邮件。
tasks.py
from celery.task import Task
from django.core.mail import send_mail
from django.template import loader
from django.utils.html import strip_tags
from config.celery import app
from config.settings import default
class SendConfirmationEmail(Task):
def __init__(self, *args, **kwargs):
self.user_name = kwargs.get('username')
self.user_id = kwargs.get('id')
self.user_hash = kwargs.get('hash')
self.user_email = kwargs.get('email')
def send_email(self):
confirm_mail = loader.render_to_string('mail/confirmation.html',
{'user': self.user_name, 'id': self.user_id,
'hash': self.user_hash,
'domain': default.SITE_URL})
text_email = strip_tags(confirm_mail)
send_mail(
subject='Confirm Your E-mail',
message=text_email,
from_email='no-reply@mysite.com',
recipient_list=[self.user_email],
fail_silently=False,
html_message=confirm_mail
)
def run(self, *args, **kwargs):
self.send_email()
app.register_task(SendConfirmationEmail())
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from apps.siteuser.models import User
from apps.siteuser.tasks import SendConfirmationEmail
@receiver(post_save, sender=User)
def create_employee_details(sender, instance, created, **kwargs):
if created:
task = SendConfirmationEmail(username=instance.first_name, id=instance.id, hash=instance.hash,
email=instance.email)
task.delay()
电子邮件和 Celery 设置:
CELERY_BROKER_URL = 'amqp://username:password@localhost:5672/vhost'
EMAIL_HOST = 'smtp.mailtrap.io'
EMAIL_HOST_USER = MY_USERNAME
EMAIL_HOST_PASSWORD = MY_PASSWORD
EMAIL_PORT = 2525
芹菜原木:
[2018-05-09 11:50:41,191: INFO/MainProcess] Received task: apps.user.tasks.SendConfirmationEmail[e63c0f5f-7b81-4065-85c1-9ef87acc792a]
[2018-05-09 11:50:41,197: INFO/ForkPoolWorker-1] Task apps.user.tasks.SendConfirmationEmail[e63c0f5f-7b81-4065-85c1-9ef87acc792a] succeeded in 0.004487647999667388s: None
注意:我在没有使用 Celery 的情况下发送了邮件,并且工作正常。但是问题是在尝试 Celery.I 之后开始的,我正在使用 Mailtrap 进行开发。
这就是我通常调用 celery 任务的方式
Task.py
from celery import shared_task
@shared_task
def task1(*args,**kwargs):
pass
Caller.py
from task import task1
task1.delay(a,b,c...)
celery discovery 配置是否正确?
我正在尝试使用 Django 和 Celery 向我的注册用户发送确认电子邮件。我正在使用 RabbitMQ 作为代理。每当我执行代码时,芹菜日志显示它正在接收任务并成功执行,但我没有收到任何电子邮件。
tasks.py
from celery.task import Task
from django.core.mail import send_mail
from django.template import loader
from django.utils.html import strip_tags
from config.celery import app
from config.settings import default
class SendConfirmationEmail(Task):
def __init__(self, *args, **kwargs):
self.user_name = kwargs.get('username')
self.user_id = kwargs.get('id')
self.user_hash = kwargs.get('hash')
self.user_email = kwargs.get('email')
def send_email(self):
confirm_mail = loader.render_to_string('mail/confirmation.html',
{'user': self.user_name, 'id': self.user_id,
'hash': self.user_hash,
'domain': default.SITE_URL})
text_email = strip_tags(confirm_mail)
send_mail(
subject='Confirm Your E-mail',
message=text_email,
from_email='no-reply@mysite.com',
recipient_list=[self.user_email],
fail_silently=False,
html_message=confirm_mail
)
def run(self, *args, **kwargs):
self.send_email()
app.register_task(SendConfirmationEmail())
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from apps.siteuser.models import User
from apps.siteuser.tasks import SendConfirmationEmail
@receiver(post_save, sender=User)
def create_employee_details(sender, instance, created, **kwargs):
if created:
task = SendConfirmationEmail(username=instance.first_name, id=instance.id, hash=instance.hash,
email=instance.email)
task.delay()
电子邮件和 Celery 设置:
CELERY_BROKER_URL = 'amqp://username:password@localhost:5672/vhost'
EMAIL_HOST = 'smtp.mailtrap.io'
EMAIL_HOST_USER = MY_USERNAME
EMAIL_HOST_PASSWORD = MY_PASSWORD
EMAIL_PORT = 2525
芹菜原木:
[2018-05-09 11:50:41,191: INFO/MainProcess] Received task: apps.user.tasks.SendConfirmationEmail[e63c0f5f-7b81-4065-85c1-9ef87acc792a]
[2018-05-09 11:50:41,197: INFO/ForkPoolWorker-1] Task apps.user.tasks.SendConfirmationEmail[e63c0f5f-7b81-4065-85c1-9ef87acc792a] succeeded in 0.004487647999667388s: None
注意:我在没有使用 Celery 的情况下发送了邮件,并且工作正常。但是问题是在尝试 Celery.I 之后开始的,我正在使用 Mailtrap 进行开发。
这就是我通常调用 celery 任务的方式
Task.py
from celery import shared_task
@shared_task
def task1(*args,**kwargs):
pass
Caller.py
from task import task1
task1.delay(a,b,c...)
celery discovery 配置是否正确?