结合 python asyncio 与 pyee 事件发射器
Combine python asyncio with pyee Event Emitter
我正在尝试使用 pyee library 中的 AsyncIOEventEmitter
但没有成功。由于某种原因,发出的事件“Hi”永远不会到达 async_handler
来完成异步未来。我也没有在网上找到合适的例子。此外,我尝试提供当前事件并为 AsyncIOEventEmitter
使用新的事件循环,但两者都会产生相同的结果。
有人可以帮帮我吗?下面的示例单元测试:
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter
LOG = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_setup(event_loop):
LOG.info("1 - start")
event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())
# Create a new Future object.
future_result = event_loop.create_future()
LOG.info("2 - emit event")
event_emitter.emit("event", "Hi")
@event_emitter.on("event")
async def async_handler(message):
LOG.info(">>> %s", message)
future_result.set_result(message)
return future_result
# Wait until *future_result* has a result and print it.
LOG.info(await future_result)
谢谢!
好的,明白了,async_handler
方法必须在测试的前面定义...
现在有效:
"""Event emitter playground"""
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter
LOG = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_setup(event_loop):
"""Receive event from emitter and complete future!"""
LOG.info("1 - start")
event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())
@event_emitter.on("event")
def async_handler(message):
LOG.info(">>> %s", message)
future_result.set_result(message)
future_result = event_loop.create_future()
LOG.info("2 - emit event")
event_emitter.emit("event", "Hi")
LOG.info(await future_result)
我正在尝试使用 pyee library 中的 AsyncIOEventEmitter
但没有成功。由于某种原因,发出的事件“Hi”永远不会到达 async_handler
来完成异步未来。我也没有在网上找到合适的例子。此外,我尝试提供当前事件并为 AsyncIOEventEmitter
使用新的事件循环,但两者都会产生相同的结果。
有人可以帮帮我吗?下面的示例单元测试:
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter
LOG = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_setup(event_loop):
LOG.info("1 - start")
event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())
# Create a new Future object.
future_result = event_loop.create_future()
LOG.info("2 - emit event")
event_emitter.emit("event", "Hi")
@event_emitter.on("event")
async def async_handler(message):
LOG.info(">>> %s", message)
future_result.set_result(message)
return future_result
# Wait until *future_result* has a result and print it.
LOG.info(await future_result)
谢谢!
好的,明白了,async_handler
方法必须在测试的前面定义...
现在有效:
"""Event emitter playground"""
import asyncio
import logging
import pytest
from pyee import AsyncIOEventEmitter
LOG = logging.getLogger(__name__)
@pytest.mark.asyncio
async def test_setup(event_loop):
"""Receive event from emitter and complete future!"""
LOG.info("1 - start")
event_emitter = AsyncIOEventEmitter(asyncio.new_event_loop())
@event_emitter.on("event")
def async_handler(message):
LOG.info(">>> %s", message)
future_result.set_result(message)
future_result = event_loop.create_future()
LOG.info("2 - emit event")
event_emitter.emit("event", "Hi")
LOG.info(await future_result)