在生产中收到类型为 KeyError 的未注册任务

Received unregistered task of type KeyError on production

我有 Celery 周期性任务,它会做一些工作,然后给我发邮件。

它在本地机器上运行完美,但在 DO Ubuntu 服务器 上失败并出现错误:
Received unregistered task of type KeyError('periodic_tasks.send_email_task',).

我的 periodic_tasks.py 文件如下所示:

app = Celery('periodic_tasks', broker='')
logger = get_task_logger(__name__)

@app.task(soft_time_limit=2000, max_retries=2)
def send_email_task(task_db_id):
    db = DBMethods()
    client_listings = dw.generate_client_listings(db)
    current_task = db.get_scheduled_task_data(task_db_id)
    query = current_task.get('task_params').get('query')
    days_stay = current_task.get('task_params').get('days_stay')
    proxy = current_task.get('task_params').get('proxy')
    adults_count = int(current_task.get('task_params').get('adults_count'))
    pets_count = int(current_task.get('task_params').get('pets_count'))
    to_email = current_task.get('task_params').get('to_email')
    cc_email = current_task.get('task_params').get('cc_email')
    additional_text = current_task.get('task_params').get('additional_text')

    task_id = str(send_email_task.request.id),
    task_data = {
        "task_id": task_id,
        "task_name": "send_email_task",
        "task_args": [client_listings, query, proxy, days_stay, adults_count, pets_count,
                                 to_email, cc_email, additional_text],
        "task_start_at": datetime.now(),
        "task_status": 0
    }
    db.start_scheduled_task(task_data)

    try:
        logger.info(email_worker_new(db, client_listings, query, proxy, days_stay, adults_count, pets_count,
                                 to_email, cc_email, additional_text))

        db.finish_scheduled_task(task_id, 1, datetime.now(), 'success')
    except SoftTimeLimitExceeded:
        db.finish_scheduled_task(task_id, 9, datetime.now(), 'timeout')

app.conf.beat_schedule = {
    "send-email-001-task": {
        "task": "periodic_tasks.send_email_task",
        "schedule": crontab(minute="0", hour="*/2"),
        "args": ('email_001', )
    },
}

运行 celery with command - celery -A periodic_tasks worker -B --loglevel=info in supervisord

请帮帮我。我在生产上做错了什么?

这是典型的"problem"。

-您需要re-deploy所有机器运行ning Celery worker 的最新代码,以便他们注册新任务。

您需要一个好的部署策略。您可以做的最简单的事情是构建一个 Python 轮子,打包所有 Celery 任务,然后在所有 运行 Celery 的机器上执行 pip install -U your-project。当然,你也需要重启Celery...

一旦您的新代码出现,并且您的 Celery worker 启动并 运行ning,您可以检查您的新任务是否已被 运行ning celery -A periodic_tasks inspect registered 注册。您应该会在此列表中看到您的新任务。