在芹菜任务中模拟一个电话
Mocking out a call within a celery task
我有一个运行 celery 任务的烧瓶应用程序。我正在尝试模拟发生在该任务深处的单个 API 调用。
views.py
from mypackage.task_module import my_task
@app.route('/run_task')
def run_task():
task = my_task.delay()
return some_response
task_module.py
from mypackage.some_module import SomeClass
@celery.task
def my_task():
return SomeClass().some_function()
some_module.py
from mypackage.xyz import external_service
class SomeClass(object):
def some_function(self):
#do some stuff
result = external_service(some_param)
if 'x' in result:
#do something
elif 'y' in result:
#do something else
我想模拟 result = external_service()
行,这样我就可以触发第一个或第二个代码路径。
这就是我正在尝试的:
@mock.patch('mypackage.some_module.external_service', autospec=True)
def test_x_path(my_mock):
my_mock.return_value = {'x': some_val}
#run test, expect 'x' code path to run
但是,这不起作用,因为(我认为)补丁发生在 Flask 的 Python 进程中,而不是 Celery 使用的进程。模拟任务本身不会起作用,因为我要测试的是 当外部服务 returns 'x'
或 'y'
时任务的行为方式].
不胜感激。
一个不错的选择是在您的测试配置中将 CELERY_ALWAYS_EAGER
设置为 True
。这使得对 Celery 的所有调用都是同步的。参见documentation for this option。使用此选项,您在 Flask 进程中设置的任何模拟都应该在 Celery 任务中工作。
附带的好处是,您的测试配置得到了简化,因为您不需要 Celery worker。
UPDATE:经过评论中的讨论,您似乎不想或不能摆脱 Celery worker 的测试配置。在这种情况下,我可以提供三种我认为可以满足您需要的解决方案:
写一个remote control command that mocks your Celery task, then have the test code run it on all your workers with broadcast().
定义一个 custom command line option for your worker, say --test
. Then add a bootstep 检查此参数并进行模拟。
创建一个替代模块以在 -A
命令行参数中为 Celery 工作人员提供。这应该是原始模块的相同副本,但添加了模拟。然后使用这个替代模块启动您的工作人员进行测试。
希望您对这三个选项中的一个满意!
为测试函数创建一个设置
class TestCeleryTask(TestCase):
def setUp(self):
app.config['CELERY_ALWAYS_EAGER'] = True
app.config['BROKER_BACKEND'] = 'memory'
app.config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
def test_task(self):
# test it
我有一个运行 celery 任务的烧瓶应用程序。我正在尝试模拟发生在该任务深处的单个 API 调用。
views.py
from mypackage.task_module import my_task
@app.route('/run_task')
def run_task():
task = my_task.delay()
return some_response
task_module.py
from mypackage.some_module import SomeClass
@celery.task
def my_task():
return SomeClass().some_function()
some_module.py
from mypackage.xyz import external_service
class SomeClass(object):
def some_function(self):
#do some stuff
result = external_service(some_param)
if 'x' in result:
#do something
elif 'y' in result:
#do something else
我想模拟 result = external_service()
行,这样我就可以触发第一个或第二个代码路径。
这就是我正在尝试的:
@mock.patch('mypackage.some_module.external_service', autospec=True)
def test_x_path(my_mock):
my_mock.return_value = {'x': some_val}
#run test, expect 'x' code path to run
但是,这不起作用,因为(我认为)补丁发生在 Flask 的 Python 进程中,而不是 Celery 使用的进程。模拟任务本身不会起作用,因为我要测试的是 当外部服务 returns 'x'
或 'y'
时任务的行为方式].
不胜感激。
一个不错的选择是在您的测试配置中将 CELERY_ALWAYS_EAGER
设置为 True
。这使得对 Celery 的所有调用都是同步的。参见documentation for this option。使用此选项,您在 Flask 进程中设置的任何模拟都应该在 Celery 任务中工作。
附带的好处是,您的测试配置得到了简化,因为您不需要 Celery worker。
UPDATE:经过评论中的讨论,您似乎不想或不能摆脱 Celery worker 的测试配置。在这种情况下,我可以提供三种我认为可以满足您需要的解决方案:
写一个remote control command that mocks your Celery task, then have the test code run it on all your workers with broadcast().
定义一个 custom command line option for your worker, say
--test
. Then add a bootstep 检查此参数并进行模拟。创建一个替代模块以在
-A
命令行参数中为 Celery 工作人员提供。这应该是原始模块的相同副本,但添加了模拟。然后使用这个替代模块启动您的工作人员进行测试。
希望您对这三个选项中的一个满意!
为测试函数创建一个设置
class TestCeleryTask(TestCase):
def setUp(self):
app.config['CELERY_ALWAYS_EAGER'] = True
app.config['BROKER_BACKEND'] = 'memory'
app.config['CELERY_EAGER_PROPAGATES_EXCEPTIONS'] = True
def test_task(self):
# test it