高速公路 Python 带 Pytest 的 RPC

Autobahn Python RPC with Pytest

所以在应用程序中我必须测试 RPC 调用(有一个用 Autobahn Python 制作的网络服务器)。我想对那些进行集成测试。所以我正在使用 Pytest。我这样做的方式是这样的: 1) 我有一个这样的测试 class :

@pytest.mark.incremental
class TestCreateUser:

    @pytest.fixture(scope="function", params="Json Test data")
    def create_user_data(self, request):
        return request.param

    def test_valid_user_creation(self, create_user_data):  
        ws_realm = config["websocket"]["realm"]

        runner = ApplicationRunner(
            url="%s://%s:%s/ws" % (
            config["websocket"]["protocol"], config["websocket"]["host"], config["websocket"]["port"]),
            realm=ws_realm,
            extra={"method": u"appname.create_user", "email": create_user_data["username"], create_user_data["password"]}
        )
        runner.run(MyComponent)

然后我有一个组件叫做 "MyComponent" :

def __init__(self, config=None):
        super().__init__(config)

    @asyncio.coroutine
    def onJoin(self, details):
        try:
            result = yield from self.call(self.config.extra["method"], self.config.extra["email"], self.config.extra["password"])
        except Exception as e:
            print("Error: {}".format(e))
        else:
            print(result)

        self.leave()

    def onDisconnect(self):
        asyncio.get_event_loop().stop()

所以我遇到了一些问题,首先我无法在测试中获得 RPC 调用的结果 class,其次当我 运行 多次 "test_valid_user_creation"(与夹具系统)我得到了这样的错误:

./../.pyenv/versions/novalibrary/lib/python3.4/site-packages/autobahn/asyncio/wamp.py:167: in run
    (transport, protocol) = loop.run_until_complete(coro)
../../.pyenv/versions/3.4.3/lib/python3.4/asyncio/base_events.py:293: in run_until_complete
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_UnixSelectorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

../../.pyenv/versions/3.4.3/lib/python3.4/asyncio/base_events.py:265: RuntimeError

所以我的问题是如何检索 RPC 调用的值以及如何在使用夹具多次调用测试时无法获取 "Event loop is closed "。

谢谢。

所以基本上,为了 return 我创建另一个模块 "shared_result" 的值,然后我把一个全局变量放在那里 return 从组件到测试的值 class.至于我得到的 "Event loop is closed" 异常,我在每次调用跑步者之前都会这样做:

loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

这是我找到解决方案的地方: https://github.com/aio-libs/aiohttp/issues/441

如果有人可以对此发表评论并解释为什么需要这样做并为我提供更好的解决方案,那就太好了。