芹菜任务总是等待不同文件中的任务

Celery task always PENDING for task in different file

我正在尝试将我的任务重构到它自己的文件中。但是,这会导致作业状态未更新 - 它始终处于 PENDING 状态。不过任务 运行 没问题。

这是我的app.py

from flask import Flask, jsonify
from celery.task.control import inspect

from jobs import task_one
from factory import create_app, create_celery

app = create_app()
celery = create_celery(app)


@app.route('/run', methods=['GET'])
def run_task():
    # run job in celery
    task = task_one.run()
    return jsonify(name=app.name, status='Task is running', taskid=task.id), 202


@app.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
    task = celery.AsyncResult(taskid)
    return jsonify(status=task.state)


def main():
    app.run()


if __name__ == '__main__':
    main()

这是我的factory.py

from flask import Flask
from celery import Celery


def create_app():
    app = Flask(__name__)
    app.config['DEBUG'] = True
    app.config['CELERY_BROKER_URL'] = 'amqp://127.0.0.1'
    app.config['CELERY_RESULT_BACKEND'] = 'rpc'
    app.config['CELERY_TRACK_STARTED'] = True

    return app


def create_celery(app=None):
    app = app or create_app()
    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

这是我的jobs/task_one.py

from time import sleep    
from celery import chain    
from factory import create_celery

celery = create_celery()


@celery.task(name='jobs.long_running_task', bind=True)
def long_running_task(self, x, y):
    sleep(15)
    print 'running:', x, y
    return x + y


@celery.task(name='jobs.long_mapping_task', bind=True)
def long_mapping_task(self, x, y):
    sleep(15)
    print 'mapping:', x, y
    return x + y


def run():
    task = chain(long_running_task.s(1,2), long_mapping_task.s(4))()
    return task

所以,我 运行 rabbitmq,通过发出 celery worker -A app.celery --loglevel=debug --concurrency=1 来使用 celery,然后通过 python app.py 来 运行 安装 Flask 应用程序。

任务 运行 正常,但作业状态始终为待处理。

现在,如果我将所有内容都放入一个文件中,它就可以正常工作。以下代码应该有效:

from time import sleep

from flask import Flask, jsonify
from celery import Celery, chain
from celery.task.control import inspect


app = Flask(__name__)
app.config['DEBUG'] = True
app.config['CELERY_BROKER_URL'] = 'amqp://127.0.0.1'
app.config['CELERY_RESULT_BACKEND'] = 'rpc'
app.config['CELERY_TRACK_STARTED'] = True

celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)


@celery.task(bind=True)
def long_running_task(self, x, y):
    sleep(15)
    print 'running:', x, y
    return x + y


@celery.task(bind=True)
def long_mapping_task(self, x, y):
    sleep(15)
    print 'mapping:', x, y
    return x + y


@app.route('/run', methods=['GET'])
def run_task():
    # run job in celery
    task = chain(long_running_task.s(1,2), long_mapping_task.s(4))()
    return jsonify(name=app.name, status='Job is running', taskid=task.id), 202

@app.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
    task = celery.AsyncResult(taskid)

    return jsonify(status=task.state)


def main():
    app.run()


if __name__ == '__main__':
    main()

我不明白为什么会这样,也不知道如何解决。我在 SO 中看到过其他解决方案,但其中 none 对我的情况有效。感谢任何帮助。

您似乎有 2 个芹菜实例:

app.py celery = create_celery(app)

jobs/task_one.py celery = create_celery()

您应该通过导入在 jobs/task_one.py 中共享在 app.py 中创建的 celery 实例:

from app import celery

请注意,您可能需要移动语句 from jobs import task_one 以避免 apptask_one 模块之间的循环依赖