芹菜任务总是等待不同文件中的任务
Celery task always PENDING for task in different file
我正在尝试将我的任务重构到它自己的文件中。但是,这会导致作业状态未更新 - 它始终处于 PENDING
状态。不过任务 运行 没问题。
这是我的app.py
from flask import Flask, jsonify
from celery.task.control import inspect
from jobs import task_one
from factory import create_app, create_celery
app = create_app()
celery = create_celery(app)
@app.route('/run', methods=['GET'])
def run_task():
# run job in celery
task = task_one.run()
return jsonify(name=app.name, status='Task is running', taskid=task.id), 202
@app.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
task = celery.AsyncResult(taskid)
return jsonify(status=task.state)
def main():
app.run()
if __name__ == '__main__':
main()
这是我的factory.py
from flask import Flask
from celery import Celery
def create_app():
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['CELERY_BROKER_URL'] = 'amqp://127.0.0.1'
app.config['CELERY_RESULT_BACKEND'] = 'rpc'
app.config['CELERY_TRACK_STARTED'] = True
return app
def create_celery(app=None):
app = app or create_app()
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
这是我的jobs/task_one.py
from time import sleep
from celery import chain
from factory import create_celery
celery = create_celery()
@celery.task(name='jobs.long_running_task', bind=True)
def long_running_task(self, x, y):
sleep(15)
print 'running:', x, y
return x + y
@celery.task(name='jobs.long_mapping_task', bind=True)
def long_mapping_task(self, x, y):
sleep(15)
print 'mapping:', x, y
return x + y
def run():
task = chain(long_running_task.s(1,2), long_mapping_task.s(4))()
return task
所以,我 运行 rabbitmq
,通过发出 celery worker -A app.celery --loglevel=debug --concurrency=1
来使用 celery,然后通过 python app.py
来 运行 安装 Flask 应用程序。
任务 运行 正常,但作业状态始终为待处理。
现在,如果我将所有内容都放入一个文件中,它就可以正常工作。以下代码应该有效:
from time import sleep
from flask import Flask, jsonify
from celery import Celery, chain
from celery.task.control import inspect
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['CELERY_BROKER_URL'] = 'amqp://127.0.0.1'
app.config['CELERY_RESULT_BACKEND'] = 'rpc'
app.config['CELERY_TRACK_STARTED'] = True
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task(bind=True)
def long_running_task(self, x, y):
sleep(15)
print 'running:', x, y
return x + y
@celery.task(bind=True)
def long_mapping_task(self, x, y):
sleep(15)
print 'mapping:', x, y
return x + y
@app.route('/run', methods=['GET'])
def run_task():
# run job in celery
task = chain(long_running_task.s(1,2), long_mapping_task.s(4))()
return jsonify(name=app.name, status='Job is running', taskid=task.id), 202
@app.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
task = celery.AsyncResult(taskid)
return jsonify(status=task.state)
def main():
app.run()
if __name__ == '__main__':
main()
我不明白为什么会这样,也不知道如何解决。我在 SO 中看到过其他解决方案,但其中 none 对我的情况有效。感谢任何帮助。
您似乎有 2 个芹菜实例:
app.py celery = create_celery(app)
和
jobs/task_one.py celery = create_celery()
您应该通过导入在 jobs/task_one.py 中共享在 app.py 中创建的 celery 实例:
from app import celery
请注意,您可能需要移动语句 from jobs import task_one
以避免 app
和 task_one
模块之间的循环依赖
我正在尝试将我的任务重构到它自己的文件中。但是,这会导致作业状态未更新 - 它始终处于 PENDING
状态。不过任务 运行 没问题。
这是我的app.py
from flask import Flask, jsonify
from celery.task.control import inspect
from jobs import task_one
from factory import create_app, create_celery
app = create_app()
celery = create_celery(app)
@app.route('/run', methods=['GET'])
def run_task():
# run job in celery
task = task_one.run()
return jsonify(name=app.name, status='Task is running', taskid=task.id), 202
@app.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
task = celery.AsyncResult(taskid)
return jsonify(status=task.state)
def main():
app.run()
if __name__ == '__main__':
main()
这是我的factory.py
from flask import Flask
from celery import Celery
def create_app():
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['CELERY_BROKER_URL'] = 'amqp://127.0.0.1'
app.config['CELERY_RESULT_BACKEND'] = 'rpc'
app.config['CELERY_TRACK_STARTED'] = True
return app
def create_celery(app=None):
app = app or create_app()
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
这是我的jobs/task_one.py
from time import sleep
from celery import chain
from factory import create_celery
celery = create_celery()
@celery.task(name='jobs.long_running_task', bind=True)
def long_running_task(self, x, y):
sleep(15)
print 'running:', x, y
return x + y
@celery.task(name='jobs.long_mapping_task', bind=True)
def long_mapping_task(self, x, y):
sleep(15)
print 'mapping:', x, y
return x + y
def run():
task = chain(long_running_task.s(1,2), long_mapping_task.s(4))()
return task
所以,我 运行 rabbitmq
,通过发出 celery worker -A app.celery --loglevel=debug --concurrency=1
来使用 celery,然后通过 python app.py
来 运行 安装 Flask 应用程序。
任务 运行 正常,但作业状态始终为待处理。
现在,如果我将所有内容都放入一个文件中,它就可以正常工作。以下代码应该有效:
from time import sleep
from flask import Flask, jsonify
from celery import Celery, chain
from celery.task.control import inspect
app = Flask(__name__)
app.config['DEBUG'] = True
app.config['CELERY_BROKER_URL'] = 'amqp://127.0.0.1'
app.config['CELERY_RESULT_BACKEND'] = 'rpc'
app.config['CELERY_TRACK_STARTED'] = True
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
@celery.task(bind=True)
def long_running_task(self, x, y):
sleep(15)
print 'running:', x, y
return x + y
@celery.task(bind=True)
def long_mapping_task(self, x, y):
sleep(15)
print 'mapping:', x, y
return x + y
@app.route('/run', methods=['GET'])
def run_task():
# run job in celery
task = chain(long_running_task.s(1,2), long_mapping_task.s(4))()
return jsonify(name=app.name, status='Job is running', taskid=task.id), 202
@app.route('/status/<taskid>', methods=['GET'])
def task_status(taskid):
task = celery.AsyncResult(taskid)
return jsonify(status=task.state)
def main():
app.run()
if __name__ == '__main__':
main()
我不明白为什么会这样,也不知道如何解决。我在 SO 中看到过其他解决方案,但其中 none 对我的情况有效。感谢任何帮助。
您似乎有 2 个芹菜实例:
app.py celery = create_celery(app)
和
jobs/task_one.py celery = create_celery()
您应该通过导入在 jobs/task_one.py 中共享在 app.py 中创建的 celery 实例:
from app import celery
请注意,您可能需要移动语句 from jobs import task_one
以避免 app
和 task_one
模块之间的循环依赖