Django Celery Periodic Task 如何调用里面的Task方法 class
Django Celery Periodic Task How to call Task method That is inside class
我想定期向特定用户发送邮件用户注册到特定平台后,我尝试使用 celery 发送电子邮件,效果很好,现在我希望 django celery 定期发送邮件,我已经设置了电子邮件时间段为 15 秒。更多代码在这里
celery.py
app.conf.beat_schedule = {
'send_mail-every-day-at-4': {
'task': 'apps.users.usecases.BaseUserUseCase().email_send_task()',
'schedule': 15
}
}
我的Class就是这样->apps.users.usecases.BaseUserUseCase.email_send_task
这是我发送电子邮件的用例
class BaseUserUseCase:
##other code is skipped
@shared_task
def email_send_task(self):
print("done")
return ConfirmationEmail(context=self.context).send(to=[self.receipent])
我如何调用这个 email_send_task
方法,我做对了吗?任何帮助都没有用。
我不是 100% 确定,但下面的内容应该适合你
函数class:
class BaseUserUseCase:
def __init__(self):
self.context = {}
self.receipent = ""
在项目文件夹中添加一个task.py
文件:
import datetime
import logging
from path_to.celery import app
from rest_framework import status
from path_to import BaseUserUseCase
@app.task
def email_send_task(self):
context = BaseUserUseCase.context
receipent = BaseUserUseCase.receipent
print("done")
return ConfirmationEmail(context=context).send(to=[receipent])
配置一个celery.py
文件:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project_name')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# # add cron beat
app.conf.beat_schedule = {
'send-email-celery-task': {
'task': 'app_name.tasks.email_send_task',
'schedule': crontab(hour=0, minute=1)
},
}
你能试试让我知道它是否适合你吗?
要启用基于 class 的任务:
- 将您的 class 更改为继承自
celery.Task
- 将您的
@shared_task:email_send_task()
更改为 run()
- 为你的 class 打电话
Celery.register_task()
。结果将是可调用任务。
- 直接调用步骤 3 中的可调用任务以手动入队任务。
参考文献:
- https://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks
- https://docs.celeryproject.org/en/stable/userguide/application.html#breaking-the-chain
- https://docs.celeryproject.org/en/stable/userguide/application.html#abstract-tasks
文件结构
.
├── apps
│ └── users
│ └── usecases.py
└── my_proj
├── celery.py
└── settings.py
celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_proj.settings") # Only if using Django. Otherwise remove this line.
app = Celery("my_app")
app.conf.update(
imports=['apps.users.usecases'],
beat_schedule={
'send_mail-every-day-at-4': {
'task': 'apps.users.usecases.BaseUserUseCase',
'schedule': 5,
},
},
)
usecases.py
from celery import Task
from my_proj.celery import app
class BaseUserUseCase(Task):
def __init__(self, context, recipient):
self.context = context
self.recipient = recipient
def run(self, context=None, recipient=None): # Optional arguments context and recipient only if you need to explicitly change it for some calls
target_context = context or self.context
target_recipient = recipient or self.recipient
print(f"Send email with {target_context} to {target_recipient}")
BaseUserUseCaseTask = app.register_task(
BaseUserUseCase(
context={'setting': 'default'},
recipient="default@email.com",
)
)
日志(制作者)
$ celery --app=my_proj beat --loglevel=INFO
[2021-08-20 09:50:22,193: INFO/MainProcess] Scheduler: Sending due task send_mail-every-day-at-4 (apps.users.usecases.BaseUserUseCase)
[2021-08-20 09:50:27,181: INFO/MainProcess] Scheduler: Sending due task send_mail-every-day-at-4 (apps.users.usecases.BaseUserUseCase)
日志(消费者)
$ celery --app=my_proj worker --queues=celery --loglevel=INFO
[tasks]
. apps.users.usecases.BaseUserUseCase
[2021-08-20 09:50:22,206: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[3bce46e8-98c0-410e-a156-83e293ba6337] received
[2021-08-20 09:50:22,207: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
[2021-08-20 09:50:22,207: WARNING/ForkPoolWorker-4]
[2021-08-20 09:50:22,207: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[3bce46e8-98c0-410e-a156-83e293ba6337] succeeded in 0.0002498120002201176s: None
[2021-08-20 09:50:27,183: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[e1d2de2a-3e7d-4253-9641-41fd328a17ce] received
[2021-08-20 09:50:27,183: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
[2021-08-20 09:50:27,184: WARNING/ForkPoolWorker-4]
[2021-08-20 09:50:27,184: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[e1d2de2a-3e7d-4253-9641-41fd328a17ce] succeeded in 0.0001804589992389083s: None
如果你想把context
和recipient
改成定时任务。
celery.py
- 只需更改以下行。
...
beat_schedule={
'send_mail-every-day-at-4': {
'task': 'apps.users.usecases.BaseUserUseCase',
'schedule': 5,
'kwargs': {
'context': {'setting': 'custom'},
'recipient': "custom@email.com",
},
},
},
...
日志(消费者):
[2021-08-20 09:54:36,530: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[6b69b79b-6764-4d83-ad24-9b0723dd8c79] received
[2021-08-20 09:54:36,531: WARNING/ForkPoolWorker-4] Send email with {'setting': 'custom'} to custom@email.com
[2021-08-20 09:54:36,531: WARNING/ForkPoolWorker-4]
[2021-08-20 09:54:36,532: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[6b69b79b-6764-4d83-ad24-9b0723dd8c79] succeeded in 0.0012146830003985087s: None
[2021-08-20 09:54:41,498: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[a20d34e7-3214-4130-aba2-5544238096d0] received
[2021-08-20 09:54:41,499: WARNING/ForkPoolWorker-4] Send email with {'setting': 'custom'} to custom@email.com
[2021-08-20 09:54:41,499: WARNING/ForkPoolWorker-4]
[2021-08-20 09:54:41,500: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[a20d34e7-3214-4130-aba2-5544238096d0] succeeded in 0.00047696000001451466s: None
如果您打算手动调用任务,例如来自 Django 视图之一:
>>> from apps.users.usecases import BaseUserUseCaseTask
>>> BaseUserUseCaseTask.apply_async()
<AsyncResult: fd347270-59b8-4cec-8772-cf82a79e60df>
>>> BaseUserUseCaseTask.apply_async(kwargs={'context': {'setting': 'manual'}, 'recipient': "manual@email.com"})
<AsyncResult: 13d9df7e-e1c4-4f50-847f-72a413404c82>
日志(消费者):
[2021-08-20 09:57:19,324: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[fd347270-59b8-4cec-8772-cf82a79e60df] received
[2021-08-20 09:57:19,324: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
[2021-08-20 09:57:19,324: WARNING/ForkPoolWorker-4]
[2021-08-20 09:57:19,325: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[fd347270-59b8-4cec-8772-cf82a79e60df] succeeded in 0.00026283199986210093s: None
[2021-08-20 12:35:29,238: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[13d9df7e-e1c4-4f50-847f-72a413404c82] received
[2021-08-20 12:35:29,240: WARNING/ForkPoolWorker-4] Send email with {'setting': 'manual'} to manual@email.com
[2021-08-20 12:35:29,240: WARNING/ForkPoolWorker-4]
[2021-08-20 12:35:29,240: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[13d9df7e-e1c4-4f50-847f-72a413404c82] succeeded in 0.000784056000156852s: None
我想定期向特定用户发送邮件用户注册到特定平台后,我尝试使用 celery 发送电子邮件,效果很好,现在我希望 django celery 定期发送邮件,我已经设置了电子邮件时间段为 15 秒。更多代码在这里
celery.py
app.conf.beat_schedule = {
'send_mail-every-day-at-4': {
'task': 'apps.users.usecases.BaseUserUseCase().email_send_task()',
'schedule': 15
}
}
我的Class就是这样->apps.users.usecases.BaseUserUseCase.email_send_task 这是我发送电子邮件的用例
class BaseUserUseCase:
##other code is skipped
@shared_task
def email_send_task(self):
print("done")
return ConfirmationEmail(context=self.context).send(to=[self.receipent])
我如何调用这个 email_send_task
方法,我做对了吗?任何帮助都没有用。
我不是 100% 确定,但下面的内容应该适合你
函数class:
class BaseUserUseCase:
def __init__(self):
self.context = {}
self.receipent = ""
在项目文件夹中添加一个task.py
文件:
import datetime
import logging
from path_to.celery import app
from rest_framework import status
from path_to import BaseUserUseCase
@app.task
def email_send_task(self):
context = BaseUserUseCase.context
receipent = BaseUserUseCase.receipent
print("done")
return ConfirmationEmail(context=context).send(to=[receipent])
配置一个celery.py
文件:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
from celery.schedules import crontab
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings')
app = Celery('project_name')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
# # add cron beat
app.conf.beat_schedule = {
'send-email-celery-task': {
'task': 'app_name.tasks.email_send_task',
'schedule': crontab(hour=0, minute=1)
},
}
你能试试让我知道它是否适合你吗?
要启用基于 class 的任务:
- 将您的 class 更改为继承自
celery.Task
- 将您的
@shared_task:email_send_task()
更改为run()
- 为你的 class 打电话
Celery.register_task()
。结果将是可调用任务。 - 直接调用步骤 3 中的可调用任务以手动入队任务。
参考文献:
- https://docs.celeryproject.org/en/4.0/whatsnew-4.0.html#the-task-base-class-no-longer-automatically-register-tasks
- https://docs.celeryproject.org/en/stable/userguide/application.html#breaking-the-chain
- https://docs.celeryproject.org/en/stable/userguide/application.html#abstract-tasks
文件结构
.
├── apps
│ └── users
│ └── usecases.py
└── my_proj
├── celery.py
└── settings.py
celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "my_proj.settings") # Only if using Django. Otherwise remove this line.
app = Celery("my_app")
app.conf.update(
imports=['apps.users.usecases'],
beat_schedule={
'send_mail-every-day-at-4': {
'task': 'apps.users.usecases.BaseUserUseCase',
'schedule': 5,
},
},
)
usecases.py
from celery import Task
from my_proj.celery import app
class BaseUserUseCase(Task):
def __init__(self, context, recipient):
self.context = context
self.recipient = recipient
def run(self, context=None, recipient=None): # Optional arguments context and recipient only if you need to explicitly change it for some calls
target_context = context or self.context
target_recipient = recipient or self.recipient
print(f"Send email with {target_context} to {target_recipient}")
BaseUserUseCaseTask = app.register_task(
BaseUserUseCase(
context={'setting': 'default'},
recipient="default@email.com",
)
)
日志(制作者)
$ celery --app=my_proj beat --loglevel=INFO
[2021-08-20 09:50:22,193: INFO/MainProcess] Scheduler: Sending due task send_mail-every-day-at-4 (apps.users.usecases.BaseUserUseCase)
[2021-08-20 09:50:27,181: INFO/MainProcess] Scheduler: Sending due task send_mail-every-day-at-4 (apps.users.usecases.BaseUserUseCase)
日志(消费者)
$ celery --app=my_proj worker --queues=celery --loglevel=INFO
[tasks]
. apps.users.usecases.BaseUserUseCase
[2021-08-20 09:50:22,206: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[3bce46e8-98c0-410e-a156-83e293ba6337] received
[2021-08-20 09:50:22,207: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
[2021-08-20 09:50:22,207: WARNING/ForkPoolWorker-4]
[2021-08-20 09:50:22,207: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[3bce46e8-98c0-410e-a156-83e293ba6337] succeeded in 0.0002498120002201176s: None
[2021-08-20 09:50:27,183: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[e1d2de2a-3e7d-4253-9641-41fd328a17ce] received
[2021-08-20 09:50:27,183: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
[2021-08-20 09:50:27,184: WARNING/ForkPoolWorker-4]
[2021-08-20 09:50:27,184: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[e1d2de2a-3e7d-4253-9641-41fd328a17ce] succeeded in 0.0001804589992389083s: None
如果你想把context
和recipient
改成定时任务。
celery.py
- 只需更改以下行。
...
beat_schedule={
'send_mail-every-day-at-4': {
'task': 'apps.users.usecases.BaseUserUseCase',
'schedule': 5,
'kwargs': {
'context': {'setting': 'custom'},
'recipient': "custom@email.com",
},
},
},
...
日志(消费者):
[2021-08-20 09:54:36,530: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[6b69b79b-6764-4d83-ad24-9b0723dd8c79] received
[2021-08-20 09:54:36,531: WARNING/ForkPoolWorker-4] Send email with {'setting': 'custom'} to custom@email.com
[2021-08-20 09:54:36,531: WARNING/ForkPoolWorker-4]
[2021-08-20 09:54:36,532: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[6b69b79b-6764-4d83-ad24-9b0723dd8c79] succeeded in 0.0012146830003985087s: None
[2021-08-20 09:54:41,498: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[a20d34e7-3214-4130-aba2-5544238096d0] received
[2021-08-20 09:54:41,499: WARNING/ForkPoolWorker-4] Send email with {'setting': 'custom'} to custom@email.com
[2021-08-20 09:54:41,499: WARNING/ForkPoolWorker-4]
[2021-08-20 09:54:41,500: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[a20d34e7-3214-4130-aba2-5544238096d0] succeeded in 0.00047696000001451466s: None
如果您打算手动调用任务,例如来自 Django 视图之一:
>>> from apps.users.usecases import BaseUserUseCaseTask
>>> BaseUserUseCaseTask.apply_async()
<AsyncResult: fd347270-59b8-4cec-8772-cf82a79e60df>
>>> BaseUserUseCaseTask.apply_async(kwargs={'context': {'setting': 'manual'}, 'recipient': "manual@email.com"})
<AsyncResult: 13d9df7e-e1c4-4f50-847f-72a413404c82>
日志(消费者):
[2021-08-20 09:57:19,324: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[fd347270-59b8-4cec-8772-cf82a79e60df] received
[2021-08-20 09:57:19,324: WARNING/ForkPoolWorker-4] Send email with {'setting': 'default'} to default@email.com
[2021-08-20 09:57:19,324: WARNING/ForkPoolWorker-4]
[2021-08-20 09:57:19,325: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[fd347270-59b8-4cec-8772-cf82a79e60df] succeeded in 0.00026283199986210093s: None
[2021-08-20 12:35:29,238: INFO/MainProcess] Task apps.users.usecases.BaseUserUseCase[13d9df7e-e1c4-4f50-847f-72a413404c82] received
[2021-08-20 12:35:29,240: WARNING/ForkPoolWorker-4] Send email with {'setting': 'manual'} to manual@email.com
[2021-08-20 12:35:29,240: WARNING/ForkPoolWorker-4]
[2021-08-20 12:35:29,240: INFO/ForkPoolWorker-4] Task apps.users.usecases.BaseUserUseCase[13d9df7e-e1c4-4f50-847f-72a413404c82] succeeded in 0.000784056000156852s: None