Pytest, Flask, Celery - Celery NoneType 错误

Pytest, Flask, Celery - Celery NoneType Error

我有一个使用 Celery 的 Flask 应用程序,当应用程序在本地 运行 时,异步处理工作正常。但是,当我尝试测试 (pytest) 使用 Celery 任务的路由时,出现此错误:

app/bp_dir/routes.py:12: in <module>
    from app import db, celery_tasks
app/celery_tasks.py:13: in <module>
    @celery.task()
E   AttributeError: 'NoneType' object has no attribute 'task'

在我测试时,Celery 似乎没有正确启动。我认为潜在的问题是如何设置 pytest fixtures 以便可以测试导入 Celery 任务的路由。

app/celery_tasks.py 看起来像这样:

from app import celeryapp

celery = celeryapp.celery

@celery.task()
def example_task():
    time.sleep(3)
    return 1

app/init.py

...
from app import celeryapp


def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(Config)

    ...

    # Celery
    celery = celeryapp.create_celery_app(app)
    celeryapp.celery = celery

    ...


    return app

app/celeryapp/init.py

from celery import Celery

CELERY_TASK_LIST = [
    'app.celery_tasks',
]

db_session = None
celery = None

def create_celery_app(_app=None):
    """
    Create a new Celery object and tie together the Celery config to the app's config.
    Wrap all tasks in the context of the Flask application.
    :param _app: Flask app
    :return: Celery app
    """

    from app import db

    celery = Celery(_app.import_name,
                    backend=_app.config['CELERY_BACKEND_URL'],
                    broker=_app.config['CELERY_BROKER_URL'],
                    include=CELERY_TASK_LIST)

    celery.conf.update(_app.config)
    always_eager = _app.config['TESTING'] or False
    celery.conf.update({'TASK_ALWAYS_EAGER': always_eager,
                        'CELERY_RESULT_BACKEND': 'redis'})

    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            if not celery.conf.CELERY_ALWAYS_EAGER:
                with _app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
            else:
                # special pytest setup
                db.session = db_session
                return TaskBase.__call__(self, *args, **kwargs)

        def after_return(self, status, retval, task_id, args, kwargs, einfo):
            """
            After each Celery task, teardown our db session.
            FMI: https://gist.github.com/twolfson/a1b329e9353f9b575131
            Flask-SQLAlchemy uses create_scoped_session at startup which avoids any setup on a
            per-request basis. This means Celery can piggyback off of this initialization.
            """
            if _app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN']:
                if not isinstance(retval, Exception):
                    db.session.commit()

            # If we aren't in an eager request (i.e. Flask will perform teardown), then teardown
            if not celery.conf.CELERY_ALWAYS_EAGER:
                db.session.remove()

    celery.Task = ContextTask


    return celery

app/celeryapp/celery_worker.py

from app import celeryapp, create_app

app = create_app()
celery = celeryapp.create_celery_app(app)
celeryapp.celery = celery

Celery 设置的代码大部分是从这个 [repo]https://github.com/kwiersma/flask-celery-sqlalchemy. The repo has tests [here]https://github.com/kwiersma/flask-celery-sqlalchemy/tree/master/tests 借来的,但我真的看不出我遗漏了什么。

/testing_fixtures.py

import re
import flask
import pytest
from app import create_app, db

EMAIL = 'testemail@gmail.com'
PASSWORD = 'Password123!'

def get_codes(data_structure):
    return [(str(x)) for x in range(len(data_structure))]

def extract_csrf_token(response: flask.Response) -> str:
    # is there better way to get CSRF token? I couldn't find one
    return re.search('name="csrf_token" type="hidden" value="([^"]*)"', str(response.data)).group(1)

@pytest.fixture
def flask_app():
    app = create_app()
    app.config['SQLALCHEMY_DATABASE_URI'] = "sqlite://"
    app.config['TESTING'] = True
    app.config['SECRET_KEY'] = 'this is crucial for storing section'
    app.testing = True

    return app

@pytest.fixture
def app_context(flask_app):
    with flask_app.app_context():
        yield

@pytest.fixture
def setup_db(app_context):
    db.create_all()

@pytest.fixture
def test_client(setup_db, flask_app):
    with flask_app.test_client() as c:
        yield c

@pytest.fixture
def registered_user(test_client, flask_app):
    resp = test_client.get("/auth/register")  # to get csrf_token
    csrf_token = extract_csrf_token(resp)

    # follow_redirect=False is important - otherwise you won't be able to tell the difference between successful
    # and faulty registration
    resp = test_client.post('/auth/register',
                            data={'email': EMAIL, 'password': PASSWORD, 'password2': PASSWORD,
                                  'csrf_token': csrf_token, 'is_test':True},
                            follow_redirects=False)
    assert resp.status_code == 302, "/auth/register should redirect on successful registering"

@pytest.fixture
def logged_client(registered_user, test_client):
    resp = test_client.get("/auth/login")  # to get csrf_token
    csrf_token = extract_csrf_token(resp)

    resp = test_client.post('/auth/login', data={'email': EMAIL, 'password': PASSWORD, 'csrf_token': csrf_token})

    assert resp.status_code == 302, "/auth/login should redirect to another page on successful login"

    yield test_client

    resp = test_client.get('/auth/logout')
    assert resp.status_code == 302

示例测试:

from testing_fixtures import *

def test_example_route(logged_client):
    response = logged_client.get('/example_route')
    assert response.status_code == 200

示例路线:

from app import db, celery_tasks
...

@example_bp.route('/example_route')
def example_route():

    return 1

解开一个棘手的结,所以我会提供一个轻微的重组。

首先,请注意您不需要 Celery 实例来声明任务。

import celery

@celery.task
def mytask():
    ...

足够了。

现在,考虑在 app/__init__.py 中创建一个 Celery 实例,并推迟初始化它直到 create_app() 被调用。像

celery = Celery(__name__)

def create_app(config_class=...):
    app = Flask(__name__)
    app.config.from_object(config_class)
    ...
    celery.config_from_object(config_class)

可能会让您完全摆脱 celeryapp,避免大多数因进口订单而让您犯规的机会。

我有一个工作示例,你可以参考 here, 或者看看 Flask Mega 教程在 introduces Rq,Celery 的替代品。