如何测试将数据发送到接收器的 Faust 代理?
How to test a Faust agent that sends data to a sink?
我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我已经参考了文档 here 但它没有提到当我的 Faust 代理将数据发送到接收器时该怎么做。
没有水槽,我的测试工作正常,但是当我使用水槽时,我得到这个错误:
RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...
我尝试了各种方法:例如修补我的 Faust 应用程序中将数据发送到接收器的装饰器,尝试在没有装饰器的情况下测试我的功能(通过尝试绕过它),修补接收器参数在我的 Faust 应用程序中有一个 None 值(因此它不会将我的数据发送到接收器),等等。我没有任何运气。
这是我的浮士德特工:
app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')
detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)
@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
detection_count = 0
async for detection in detections:
detection_count += 1
# r.set("detection_count", detection_count)
yield detection
这是我当前的测试代码:
import ml_exporter
patch('ml_exporter.graph_topic', None)
def create_app():
return faust.App('ml-exporter', value_serializer='json')
@pytest.fixture()
def test_app(event_loop):
app = create_app()
app.finalize()
app.flow_control.resume()
return app
@pytest.mark.asyncio()
async def test_processDetections(test_app):
async with ml_exporter.processDetections.test_context() as agent:
event = await agent.put('hey')
assert agent.results[event.message.offset] == 'hey'
当我运行这个测试时,我得到了与上面提到的相同的错误。有什么方法可以成功测试我的 Faust 应用程序吗?
谢谢!
强制 pytest 使用 Faust 的异步事件循环作为默认的全局循环。将以下夹具添加到您的测试代码中:
@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
yield app.loop
The event_loop
fixture can be easily overridden in any of the standard pytest locations (e.g. directly in the test file, or in conftest.py) to use a non-default event loop. If the pytest.mark.asyncio
marker is applied, a pytest hook will ensure the produced loop is set as the default global loop.
我正在尝试使用 pytest 为我的 Faust 应用程序编写单元测试。我已经参考了文档 here 但它没有提到当我的 Faust 代理将数据发送到接收器时该怎么做。
没有水槽,我的测试工作正常,但是当我使用水槽时,我得到这个错误:
RuntimeError: Task <Task pending name='Task-2' coro=<Agent._execute_actor() running at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site-packages/faust/agents/agent.py:647> cb=[<TaskWakeupMethWrapper object at 0x7fc28967c5b0>()]> got Future <Future pending> attached to a different loop
INFO faust.agents.agent:logging.py:265 [^-AgentTestWrapper: ml_exporter.processDetections]: Stopping...
我尝试了各种方法:例如修补我的 Faust 应用程序中将数据发送到接收器的装饰器,尝试在没有装饰器的情况下测试我的功能(通过尝试绕过它),修补接收器参数在我的 Faust 应用程序中有一个 None 值(因此它不会将我的数据发送到接收器),等等。我没有任何运气。
这是我的浮士德特工:
app = faust.App('ml-exporter', broker=dx_broker, value_serializer='json')
detection_topic = app.topic(dx_topic)
graph_topic = app.topic(gwh_topic)
@app.agent(detection_topic, sink=[graph_topic])
async def processDetections(detections):
detection_count = 0
async for detection in detections:
detection_count += 1
# r.set("detection_count", detection_count)
yield detection
这是我当前的测试代码:
import ml_exporter
patch('ml_exporter.graph_topic', None)
def create_app():
return faust.App('ml-exporter', value_serializer='json')
@pytest.fixture()
def test_app(event_loop):
app = create_app()
app.finalize()
app.flow_control.resume()
return app
@pytest.mark.asyncio()
async def test_processDetections(test_app):
async with ml_exporter.processDetections.test_context() as agent:
event = await agent.put('hey')
assert agent.results[event.message.offset] == 'hey'
当我运行这个测试时,我得到了与上面提到的相同的错误。有什么方法可以成功测试我的 Faust 应用程序吗?
谢谢!
强制 pytest 使用 Faust 的异步事件循环作为默认的全局循环。将以下夹具添加到您的测试代码中:
@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
yield app.loop
The
event_loop
fixture can be easily overridden in any of the standard pytest locations (e.g. directly in the test file, or in conftest.py) to use a non-default event loop. If thepytest.mark.asyncio
marker is applied, a pytest hook will ensure the produced loop is set as the default global loop.