我如何 运行 一个 thrift 服务器或 TCP 服务器作为 pytest fixture?
How can I run a thrift server or TCP server as a pytest fixture?
平台 linux2,python 2.7.12-final-0
我的集成测试 ping 外部服务器,但我想将其更改为使用测试装置。
几天来我一直在尝试 运行 Thrift 库 TCP 服务器作为 pytest 中的会话范围固定装置。我已经尝试 运行 服务器作为后台线程,这样我的测试就不会被 运行ning 阻止。
@pytest.fixture(scope="session", autouse=True)
def thrift_server():
config = get_config()
nprocesses = 4
try:
nprocesses = config['num_processes']
except:
pass
args = (Handler, config['db_credentials'], config['server_port'])
kwargs = ({'env': config['env'], 'processpool': True, 'num_processes': nprocesses,
'handler_config': config, 'logfile': 'tests/test_server_log.txt'})
tserver = ThriftServer()
tserver.add_path(config['thrift_path'])
tserver.set_service("search")
try:
thread = threading.Thread(target=tserver.runserver, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
yield tserver
except:
print("BOOHOO")
print("TEARDOWN: test server")
对于我的每个测试,我从 pytest 库代码中得到一个错误:
ERROR at setup of Tests.test_item_is_found
venv/lib/python2.7/site-packages/_pytest/runner.py:226: in from_call
result = func()
venv/lib/python2.7/site-packages/_pytest/runner.py:198: in <lambda>
lambda: ihook(item=item, **kwds), when=when, reraise=reraise
venv/lib/python2.7/site-packages/pluggy/hooks.py:289: in __call__
return self._hookexec(self, self.get_hookimpls(), kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:87: in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:81: in <lambda>
firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
venv/lib/python2.7/site-packages/_pytest/runner.py:116: in pytest_runtest_setup
item.session._setupstate.prepare(item)
venv/lib/python2.7/site-packages/_pytest/runner.py:362: in prepare
col.setup()
venv/lib/python2.7/site-packages/_pytest/unittest.py:119: in setup
self._request._fillfixtures()
venv/lib/python2.7/site-packages/_pytest/fixtures.py:469: in _fillfixtures
item.funcargs[argname] = self.getfixturevalue(argname)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:479: in getfixturevalue
return self._get_active_fixturedef(argname).cached_result[0]
venv/lib/python2.7/site-packages/_pytest/fixtures.py:502: in _get_active_fixturedef
self._compute_fixture_value(fixturedef)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:587: in _compute_fixture_value
fixturedef.execute(request=subrequest)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:894: in execute
return hook.pytest_fixture_setup(fixturedef=self, request=request)
venv/lib/python2.7/site-packages/pluggy/hooks.py:289: in __call__
return self._hookexec(self, self.get_hookimpls(), kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:87: in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:81: in <lambda>
firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
venv/lib/python2.7/site-packages/_pytest/fixtures.py:936: in pytest_fixture_setup
result = call_fixture_func(fixturefunc, request, kwargs)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:791: in call_fixture_func
res = next(it)
E StopIteration
这是一个我没有找到很好的文档或支持的问题,所以我不得不问这个问题..我怎样才能将 TCP 服务器设置为我的测试可以使用的 pytest fixture?
作为推论,这是我应该自己动手的情况,还是有一个很好的插件来支持这个 pytest 用例?同样,我的搜索结果很少。
我能够使用以下设置将 Thrift 服务器设置为 pytest fixture:
import pytest
import threading
from pythrift import ThriftServer
from myHandler import Handler
@pytest.fixture(scope="session", autouse=True)
def thrift_server():
config = get_config()
nprocesses = 4
try:
nprocesses = config['num_processes']
except:
pass
args = (Handler, config['db_credentials'], config['server_port'])
kwargs = ({'env': config['env'], 'processpool': True, 'num_processes': nprocesses,
'handler_config': config, 'logfile': 'tests/test_server_log.txt'})
tserver = ThriftServer()
tserver.add_path(config['thrift_path'])
tserver.set_service("search")
try:
thread = threading.Thread(target=tserver.run_server, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
yield tserver
except Exception as e:
print(e)
print("TEARDOWN: test server")
As a corollary, is this a situation where I should roll-my-own or is
there a good plugin to support this pytest use-case? Again, my search
has turned up little on this.
如果有人遇到同样的情况,pytest-xprocess 在这里应该会有用。您可以像下面这样编写一个简单的固定装置来启动您的服务器实例并使其在您的测试中可用:
# content of conftest.py
import pytest
from xprocess import ProcessStarter
@pytest.fixture
def myserver(xprocess):
class Starter(ProcessStarter):
# startup pattern
pattern = "PATTERN"
# command to start process
args = ['command', 'arg1', 'arg2']
# ensure process is running and return its logfile
logfile = xprocess.ensure("myserver", Starter)
conn = # create a connection or url/port info to the server
yield conn
# clean up whole process tree afterwards
xprocess.getinfo("myserver").terminate()
平台 linux2,python 2.7.12-final-0
我的集成测试 ping 外部服务器,但我想将其更改为使用测试装置。
几天来我一直在尝试 运行 Thrift 库 TCP 服务器作为 pytest 中的会话范围固定装置。我已经尝试 运行 服务器作为后台线程,这样我的测试就不会被 运行ning 阻止。
@pytest.fixture(scope="session", autouse=True)
def thrift_server():
config = get_config()
nprocesses = 4
try:
nprocesses = config['num_processes']
except:
pass
args = (Handler, config['db_credentials'], config['server_port'])
kwargs = ({'env': config['env'], 'processpool': True, 'num_processes': nprocesses,
'handler_config': config, 'logfile': 'tests/test_server_log.txt'})
tserver = ThriftServer()
tserver.add_path(config['thrift_path'])
tserver.set_service("search")
try:
thread = threading.Thread(target=tserver.runserver, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
yield tserver
except:
print("BOOHOO")
print("TEARDOWN: test server")
对于我的每个测试,我从 pytest 库代码中得到一个错误:
ERROR at setup of Tests.test_item_is_found
venv/lib/python2.7/site-packages/_pytest/runner.py:226: in from_call
result = func()
venv/lib/python2.7/site-packages/_pytest/runner.py:198: in <lambda>
lambda: ihook(item=item, **kwds), when=when, reraise=reraise
venv/lib/python2.7/site-packages/pluggy/hooks.py:289: in __call__
return self._hookexec(self, self.get_hookimpls(), kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:87: in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:81: in <lambda>
firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
venv/lib/python2.7/site-packages/_pytest/runner.py:116: in pytest_runtest_setup
item.session._setupstate.prepare(item)
venv/lib/python2.7/site-packages/_pytest/runner.py:362: in prepare
col.setup()
venv/lib/python2.7/site-packages/_pytest/unittest.py:119: in setup
self._request._fillfixtures()
venv/lib/python2.7/site-packages/_pytest/fixtures.py:469: in _fillfixtures
item.funcargs[argname] = self.getfixturevalue(argname)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:479: in getfixturevalue
return self._get_active_fixturedef(argname).cached_result[0]
venv/lib/python2.7/site-packages/_pytest/fixtures.py:502: in _get_active_fixturedef
self._compute_fixture_value(fixturedef)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:587: in _compute_fixture_value
fixturedef.execute(request=subrequest)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:894: in execute
return hook.pytest_fixture_setup(fixturedef=self, request=request)
venv/lib/python2.7/site-packages/pluggy/hooks.py:289: in __call__
return self._hookexec(self, self.get_hookimpls(), kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:87: in _hookexec
return self._inner_hookexec(hook, methods, kwargs)
venv/lib/python2.7/site-packages/pluggy/manager.py:81: in <lambda>
firstresult=hook.spec.opts.get("firstresult") if hook.spec else False,
venv/lib/python2.7/site-packages/_pytest/fixtures.py:936: in pytest_fixture_setup
result = call_fixture_func(fixturefunc, request, kwargs)
venv/lib/python2.7/site-packages/_pytest/fixtures.py:791: in call_fixture_func
res = next(it)
E StopIteration
这是一个我没有找到很好的文档或支持的问题,所以我不得不问这个问题..我怎样才能将 TCP 服务器设置为我的测试可以使用的 pytest fixture?
作为推论,这是我应该自己动手的情况,还是有一个很好的插件来支持这个 pytest 用例?同样,我的搜索结果很少。
我能够使用以下设置将 Thrift 服务器设置为 pytest fixture:
import pytest
import threading
from pythrift import ThriftServer
from myHandler import Handler
@pytest.fixture(scope="session", autouse=True)
def thrift_server():
config = get_config()
nprocesses = 4
try:
nprocesses = config['num_processes']
except:
pass
args = (Handler, config['db_credentials'], config['server_port'])
kwargs = ({'env': config['env'], 'processpool': True, 'num_processes': nprocesses,
'handler_config': config, 'logfile': 'tests/test_server_log.txt'})
tserver = ThriftServer()
tserver.add_path(config['thrift_path'])
tserver.set_service("search")
try:
thread = threading.Thread(target=tserver.run_server, args=args, kwargs=kwargs)
thread.daemon = True
thread.start()
yield tserver
except Exception as e:
print(e)
print("TEARDOWN: test server")
As a corollary, is this a situation where I should roll-my-own or is there a good plugin to support this pytest use-case? Again, my search has turned up little on this.
如果有人遇到同样的情况,pytest-xprocess 在这里应该会有用。您可以像下面这样编写一个简单的固定装置来启动您的服务器实例并使其在您的测试中可用:
# content of conftest.py
import pytest
from xprocess import ProcessStarter
@pytest.fixture
def myserver(xprocess):
class Starter(ProcessStarter):
# startup pattern
pattern = "PATTERN"
# command to start process
args = ['command', 'arg1', 'arg2']
# ensure process is running and return its logfile
logfile = xprocess.ensure("myserver", Starter)
conn = # create a connection or url/port info to the server
yield conn
# clean up whole process tree afterwards
xprocess.getinfo("myserver").terminate()