在生产中收到类型为 KeyError 的未注册任务
Received unregistered task of type KeyError on production
我有 Celery 周期性任务,它会做一些工作,然后给我发邮件。
它在本地机器上运行完美,但在 DO Ubuntu 服务器 上失败并出现错误:
Received unregistered task of type KeyError('periodic_tasks.send_email_task',)
.
我的 periodic_tasks.py 文件如下所示:
app = Celery('periodic_tasks', broker='')
logger = get_task_logger(__name__)
@app.task(soft_time_limit=2000, max_retries=2)
def send_email_task(task_db_id):
db = DBMethods()
client_listings = dw.generate_client_listings(db)
current_task = db.get_scheduled_task_data(task_db_id)
query = current_task.get('task_params').get('query')
days_stay = current_task.get('task_params').get('days_stay')
proxy = current_task.get('task_params').get('proxy')
adults_count = int(current_task.get('task_params').get('adults_count'))
pets_count = int(current_task.get('task_params').get('pets_count'))
to_email = current_task.get('task_params').get('to_email')
cc_email = current_task.get('task_params').get('cc_email')
additional_text = current_task.get('task_params').get('additional_text')
task_id = str(send_email_task.request.id),
task_data = {
"task_id": task_id,
"task_name": "send_email_task",
"task_args": [client_listings, query, proxy, days_stay, adults_count, pets_count,
to_email, cc_email, additional_text],
"task_start_at": datetime.now(),
"task_status": 0
}
db.start_scheduled_task(task_data)
try:
logger.info(email_worker_new(db, client_listings, query, proxy, days_stay, adults_count, pets_count,
to_email, cc_email, additional_text))
db.finish_scheduled_task(task_id, 1, datetime.now(), 'success')
except SoftTimeLimitExceeded:
db.finish_scheduled_task(task_id, 9, datetime.now(), 'timeout')
app.conf.beat_schedule = {
"send-email-001-task": {
"task": "periodic_tasks.send_email_task",
"schedule": crontab(minute="0", hour="*/2"),
"args": ('email_001', )
},
}
运行 celery with command - celery -A periodic_tasks worker -B --loglevel=info
in supervisord
请帮帮我。我在生产上做错了什么?
这是典型的"problem"。
-您需要re-deploy所有机器运行ning Celery worker 的最新代码,以便他们注册新任务。
您需要一个好的部署策略。您可以做的最简单的事情是构建一个 Python 轮子,打包所有 Celery 任务,然后在所有 运行 Celery 的机器上执行 pip install -U your-project
。当然,你也需要重启Celery...
一旦您的新代码出现,并且您的 Celery worker 启动并 运行ning,您可以检查您的新任务是否已被 运行ning celery -A periodic_tasks inspect registered
注册。您应该会在此列表中看到您的新任务。
我有 Celery 周期性任务,它会做一些工作,然后给我发邮件。
它在本地机器上运行完美,但在 DO Ubuntu 服务器 上失败并出现错误:
Received unregistered task of type KeyError('periodic_tasks.send_email_task',)
.
我的 periodic_tasks.py 文件如下所示:
app = Celery('periodic_tasks', broker='')
logger = get_task_logger(__name__)
@app.task(soft_time_limit=2000, max_retries=2)
def send_email_task(task_db_id):
db = DBMethods()
client_listings = dw.generate_client_listings(db)
current_task = db.get_scheduled_task_data(task_db_id)
query = current_task.get('task_params').get('query')
days_stay = current_task.get('task_params').get('days_stay')
proxy = current_task.get('task_params').get('proxy')
adults_count = int(current_task.get('task_params').get('adults_count'))
pets_count = int(current_task.get('task_params').get('pets_count'))
to_email = current_task.get('task_params').get('to_email')
cc_email = current_task.get('task_params').get('cc_email')
additional_text = current_task.get('task_params').get('additional_text')
task_id = str(send_email_task.request.id),
task_data = {
"task_id": task_id,
"task_name": "send_email_task",
"task_args": [client_listings, query, proxy, days_stay, adults_count, pets_count,
to_email, cc_email, additional_text],
"task_start_at": datetime.now(),
"task_status": 0
}
db.start_scheduled_task(task_data)
try:
logger.info(email_worker_new(db, client_listings, query, proxy, days_stay, adults_count, pets_count,
to_email, cc_email, additional_text))
db.finish_scheduled_task(task_id, 1, datetime.now(), 'success')
except SoftTimeLimitExceeded:
db.finish_scheduled_task(task_id, 9, datetime.now(), 'timeout')
app.conf.beat_schedule = {
"send-email-001-task": {
"task": "periodic_tasks.send_email_task",
"schedule": crontab(minute="0", hour="*/2"),
"args": ('email_001', )
},
}
运行 celery with command - celery -A periodic_tasks worker -B --loglevel=info
in supervisord
请帮帮我。我在生产上做错了什么?
这是典型的"problem"。
-您需要re-deploy所有机器运行ning Celery worker 的最新代码,以便他们注册新任务。
您需要一个好的部署策略。您可以做的最简单的事情是构建一个 Python 轮子,打包所有 Celery 任务,然后在所有 运行 Celery 的机器上执行 pip install -U your-project
。当然,你也需要重启Celery...
一旦您的新代码出现,并且您的 Celery worker 启动并 运行ning,您可以检查您的新任务是否已被 运行ning celery -A periodic_tasks inspect registered
注册。您应该会在此列表中看到您的新任务。