如何将方法放入线程并在执行测试时使用它
how to put a method in to thread and use it when performing the test
我有这部分代码正在对 redis 进行 psubscribe。我想 运行 这部分代码在后台工作的线程中,而另一部分代码将检查下面的一些通知。
def psubscribe(context, param1, param2, param3):
context.test_config = load_config()
RedisConnector(context.test_config["redis_host"],
context.test_config["redis_db_index"])
redis_notification_subscriber_connector = RedisConnector(context.test_config["notification__redis_host"],
int(param3),
int(context.test_config[
"notification_redis_port"]))
context.redis_connectors = redis_notification_connector.psubscribe_to_redis_event(param1,
timeout_seconds=int(
param2)
)
到目前为止我做了什么:但它不是运行宁:(
context.t = threading.Thread(target=psubscribe, args=['param1', 'param2', 'param3'])
context.t.start()
它确实有效。我认为您实际上不需要将 context
变量传递给 psubscribe
函数。
这是一个例子:
- 启动监听 8000 端口的 http 服务器作为后台线程
- 向它发送 http 请求并验证响应
特色场景:
Scenario: Run background process and validate responses
Given Start background process
Then Validate outputs
background_steps.py 文件:
import threading
import logging
from behave import *
from features.steps.utils import run_server
import requests
@given("Start background process")
def step_impl(context):
context.t = threading.Thread(target=run_server, args=[8000])
context.t.daemon = True
context.t.start()
@then("Validate outputs")
def step_impl(context):
response = requests.get('http://127.0.0.1:8000')
assert response.status_code == 501
utils.py 文件
from http.server import HTTPServer, BaseHTTPRequestHandler
def run_server(port, server_class=HTTPServer, handler_class=BaseHTTPRequestHandler):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
httpd.serve_forever()
我有这部分代码正在对 redis 进行 psubscribe。我想 运行 这部分代码在后台工作的线程中,而另一部分代码将检查下面的一些通知。
def psubscribe(context, param1, param2, param3):
context.test_config = load_config()
RedisConnector(context.test_config["redis_host"],
context.test_config["redis_db_index"])
redis_notification_subscriber_connector = RedisConnector(context.test_config["notification__redis_host"],
int(param3),
int(context.test_config[
"notification_redis_port"]))
context.redis_connectors = redis_notification_connector.psubscribe_to_redis_event(param1,
timeout_seconds=int(
param2)
)
到目前为止我做了什么:但它不是运行宁:(
context.t = threading.Thread(target=psubscribe, args=['param1', 'param2', 'param3'])
context.t.start()
它确实有效。我认为您实际上不需要将 context
变量传递给 psubscribe
函数。
这是一个例子:
- 启动监听 8000 端口的 http 服务器作为后台线程
- 向它发送 http 请求并验证响应
特色场景:
Scenario: Run background process and validate responses
Given Start background process
Then Validate outputs
background_steps.py 文件:
import threading
import logging
from behave import *
from features.steps.utils import run_server
import requests
@given("Start background process")
def step_impl(context):
context.t = threading.Thread(target=run_server, args=[8000])
context.t.daemon = True
context.t.start()
@then("Validate outputs")
def step_impl(context):
response = requests.get('http://127.0.0.1:8000')
assert response.status_code == 501
utils.py 文件
from http.server import HTTPServer, BaseHTTPRequestHandler
def run_server(port, server_class=HTTPServer, handler_class=BaseHTTPRequestHandler):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
httpd.serve_forever()