Pytest, Flask, Celery - Celery NoneType 错误
Pytest, Flask, Celery - Celery NoneType Error
我有一个使用 Celery 的 Flask 应用程序,当应用程序在本地 运行 时,异步处理工作正常。但是,当我尝试测试 (pytest) 使用 Celery 任务的路由时,出现此错误:
app/bp_dir/routes.py:12: in <module>
from app import db, celery_tasks
app/celery_tasks.py:13: in <module>
@celery.task()
E AttributeError: 'NoneType' object has no attribute 'task'
在我测试时,Celery 似乎没有正确启动。我认为潜在的问题是如何设置 pytest fixtures 以便可以测试导入 Celery 任务的路由。
app/celery_tasks.py 看起来像这样:
from app import celeryapp
celery = celeryapp.celery
@celery.task()
def example_task():
time.sleep(3)
return 1
app/init.py
...
from app import celeryapp
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
...
# Celery
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery
...
return app
app/celeryapp/init.py
from celery import Celery
CELERY_TASK_LIST = [
'app.celery_tasks',
]
db_session = None
celery = None
def create_celery_app(_app=None):
"""
Create a new Celery object and tie together the Celery config to the app's config.
Wrap all tasks in the context of the Flask application.
:param _app: Flask app
:return: Celery app
"""
from app import db
celery = Celery(_app.import_name,
backend=_app.config['CELERY_BACKEND_URL'],
broker=_app.config['CELERY_BROKER_URL'],
include=CELERY_TASK_LIST)
celery.conf.update(_app.config)
always_eager = _app.config['TESTING'] or False
celery.conf.update({'TASK_ALWAYS_EAGER': always_eager,
'CELERY_RESULT_BACKEND': 'redis'})
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if not celery.conf.CELERY_ALWAYS_EAGER:
with _app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
# special pytest setup
db.session = db_session
return TaskBase.__call__(self, *args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""
After each Celery task, teardown our db session.
FMI: https://gist.github.com/twolfson/a1b329e9353f9b575131
Flask-SQLAlchemy uses create_scoped_session at startup which avoids any setup on a
per-request basis. This means Celery can piggyback off of this initialization.
"""
if _app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
if not isinstance(retval, Exception):
db.session.commit()
# If we aren't in an eager request (i.e. Flask will perform teardown), then teardown
if not celery.conf.CELERY_ALWAYS_EAGER:
db.session.remove()
celery.Task = ContextTask
return celery
app/celeryapp/celery_worker.py
from app import celeryapp, create_app
app = create_app()
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery
Celery 设置的代码大部分是从这个 [repo]https://github.com/kwiersma/flask-celery-sqlalchemy. The repo has tests [here]https://github.com/kwiersma/flask-celery-sqlalchemy/tree/master/tests 借来的,但我真的看不出我遗漏了什么。
/testing_fixtures.py
import re
import flask
import pytest
from app import create_app, db
EMAIL = 'testemail@gmail.com'
PASSWORD = 'Password123!'
def get_codes(data_structure):
return [(str(x)) for x in range(len(data_structure))]
def extract_csrf_token(response: flask.Response) -> str:
# is there better way to get CSRF token? I couldn't find one
return re.search('name="csrf_token" type="hidden" value="([^"]*)"', str(response.data)).group(1)
@pytest.fixture
def flask_app():
app = create_app()
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite://"
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'this is crucial for storing section'
app.testing = True
return app
@pytest.fixture
def app_context(flask_app):
with flask_app.app_context():
yield
@pytest.fixture
def setup_db(app_context):
db.create_all()
@pytest.fixture
def test_client(setup_db, flask_app):
with flask_app.test_client() as c:
yield c
@pytest.fixture
def registered_user(test_client, flask_app):
resp = test_client.get("/auth/register") # to get csrf_token
csrf_token = extract_csrf_token(resp)
# follow_redirect=False is important - otherwise you won't be able to tell the difference between successful
# and faulty registration
resp = test_client.post('/auth/register',
data={'email': EMAIL, 'password': PASSWORD, 'password2': PASSWORD,
'csrf_token': csrf_token, 'is_test':True},
follow_redirects=False)
assert resp.status_code == 302, "/auth/register should redirect on successful registering"
@pytest.fixture
def logged_client(registered_user, test_client):
resp = test_client.get("/auth/login") # to get csrf_token
csrf_token = extract_csrf_token(resp)
resp = test_client.post('/auth/login', data={'email': EMAIL, 'password': PASSWORD, 'csrf_token': csrf_token})
assert resp.status_code == 302, "/auth/login should redirect to another page on successful login"
yield test_client
resp = test_client.get('/auth/logout')
assert resp.status_code == 302
示例测试:
from testing_fixtures import *
def test_example_route(logged_client):
response = logged_client.get('/example_route')
assert response.status_code == 200
示例路线:
from app import db, celery_tasks
...
@example_bp.route('/example_route')
def example_route():
return 1
解开一个棘手的结,所以我会提供一个轻微的重组。
首先,请注意您不需要 Celery 实例来声明任务。
import celery
@celery.task
def mytask():
...
足够了。
现在,考虑在 app/__init__.py
中创建一个 Celery 实例,并推迟初始化它直到 create_app()
被调用。像
celery = Celery(__name__)
def create_app(config_class=...):
app = Flask(__name__)
app.config.from_object(config_class)
...
celery.config_from_object(config_class)
可能会让您完全摆脱 celeryapp
,避免大多数因进口订单而让您犯规的机会。
我有一个工作示例,你可以参考 here,
或者看看 Flask Mega 教程在
introduces Rq,Celery 的替代品。
我有一个使用 Celery 的 Flask 应用程序,当应用程序在本地 运行 时,异步处理工作正常。但是,当我尝试测试 (pytest) 使用 Celery 任务的路由时,出现此错误:
app/bp_dir/routes.py:12: in <module>
from app import db, celery_tasks
app/celery_tasks.py:13: in <module>
@celery.task()
E AttributeError: 'NoneType' object has no attribute 'task'
在我测试时,Celery 似乎没有正确启动。我认为潜在的问题是如何设置 pytest fixtures 以便可以测试导入 Celery 任务的路由。
app/celery_tasks.py 看起来像这样:
from app import celeryapp
celery = celeryapp.celery
@celery.task()
def example_task():
time.sleep(3)
return 1
app/init.py
...
from app import celeryapp
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(Config)
...
# Celery
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery
...
return app
app/celeryapp/init.py
from celery import Celery
CELERY_TASK_LIST = [
'app.celery_tasks',
]
db_session = None
celery = None
def create_celery_app(_app=None):
"""
Create a new Celery object and tie together the Celery config to the app's config.
Wrap all tasks in the context of the Flask application.
:param _app: Flask app
:return: Celery app
"""
from app import db
celery = Celery(_app.import_name,
backend=_app.config['CELERY_BACKEND_URL'],
broker=_app.config['CELERY_BROKER_URL'],
include=CELERY_TASK_LIST)
celery.conf.update(_app.config)
always_eager = _app.config['TESTING'] or False
celery.conf.update({'TASK_ALWAYS_EAGER': always_eager,
'CELERY_RESULT_BACKEND': 'redis'})
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
if not celery.conf.CELERY_ALWAYS_EAGER:
with _app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
else:
# special pytest setup
db.session = db_session
return TaskBase.__call__(self, *args, **kwargs)
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""
After each Celery task, teardown our db session.
FMI: https://gist.github.com/twolfson/a1b329e9353f9b575131
Flask-SQLAlchemy uses create_scoped_session at startup which avoids any setup on a
per-request basis. This means Celery can piggyback off of this initialization.
"""
if _app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
if not isinstance(retval, Exception):
db.session.commit()
# If we aren't in an eager request (i.e. Flask will perform teardown), then teardown
if not celery.conf.CELERY_ALWAYS_EAGER:
db.session.remove()
celery.Task = ContextTask
return celery
app/celeryapp/celery_worker.py
from app import celeryapp, create_app
app = create_app()
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery
Celery 设置的代码大部分是从这个 [repo]https://github.com/kwiersma/flask-celery-sqlalchemy. The repo has tests [here]https://github.com/kwiersma/flask-celery-sqlalchemy/tree/master/tests 借来的,但我真的看不出我遗漏了什么。
/testing_fixtures.py
import re
import flask
import pytest
from app import create_app, db
EMAIL = 'testemail@gmail.com'
PASSWORD = 'Password123!'
def get_codes(data_structure):
return [(str(x)) for x in range(len(data_structure))]
def extract_csrf_token(response: flask.Response) -> str:
# is there better way to get CSRF token? I couldn't find one
return re.search('name="csrf_token" type="hidden" value="([^"]*)"', str(response.data)).group(1)
@pytest.fixture
def flask_app():
app = create_app()
app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite://"
app.config['TESTING'] = True
app.config['SECRET_KEY'] = 'this is crucial for storing section'
app.testing = True
return app
@pytest.fixture
def app_context(flask_app):
with flask_app.app_context():
yield
@pytest.fixture
def setup_db(app_context):
db.create_all()
@pytest.fixture
def test_client(setup_db, flask_app):
with flask_app.test_client() as c:
yield c
@pytest.fixture
def registered_user(test_client, flask_app):
resp = test_client.get("/auth/register") # to get csrf_token
csrf_token = extract_csrf_token(resp)
# follow_redirect=False is important - otherwise you won't be able to tell the difference between successful
# and faulty registration
resp = test_client.post('/auth/register',
data={'email': EMAIL, 'password': PASSWORD, 'password2': PASSWORD,
'csrf_token': csrf_token, 'is_test':True},
follow_redirects=False)
assert resp.status_code == 302, "/auth/register should redirect on successful registering"
@pytest.fixture
def logged_client(registered_user, test_client):
resp = test_client.get("/auth/login") # to get csrf_token
csrf_token = extract_csrf_token(resp)
resp = test_client.post('/auth/login', data={'email': EMAIL, 'password': PASSWORD, 'csrf_token': csrf_token})
assert resp.status_code == 302, "/auth/login should redirect to another page on successful login"
yield test_client
resp = test_client.get('/auth/logout')
assert resp.status_code == 302
示例测试:
from testing_fixtures import *
def test_example_route(logged_client):
response = logged_client.get('/example_route')
assert response.status_code == 200
示例路线:
from app import db, celery_tasks
...
@example_bp.route('/example_route')
def example_route():
return 1
解开一个棘手的结,所以我会提供一个轻微的重组。
首先,请注意您不需要 Celery 实例来声明任务。
import celery
@celery.task
def mytask():
...
足够了。
现在,考虑在 app/__init__.py
中创建一个 Celery 实例,并推迟初始化它直到 create_app()
被调用。像
celery = Celery(__name__)
def create_app(config_class=...):
app = Flask(__name__)
app.config.from_object(config_class)
...
celery.config_from_object(config_class)
可能会让您完全摆脱 celeryapp
,避免大多数因进口订单而让您犯规的机会。
我有一个工作示例,你可以参考 here, 或者看看 Flask Mega 教程在 introduces Rq,Celery 的替代品。