如何在 QThread 中捕获来自服务器 运行 的信号
How to catch a signal from server running in a QThread
我有 2 个 classes:
- 一台服务器
- 一个假人class
我需要能够从像 Blender 这样的程序中 运行 这个。由于 服务器 需要不断地监听任何请求,我需要 运行 它在一个单独的线程中,否则它会阻塞主线程 UI。 dummy 需要在主线程中 运行。
当 server 收到请求时,它需要发出一个信号,该信号将被 dummy 捕获。但是,无论我尝试什么,我似乎都无法捕捉到来自 server.
的信号
这里有一些示例代码可以 运行 没有 Blender:
server 和 dummy classes
import wsgiref.simple_server
import json
from PySide2.QtCore import *
class DummyClass(QObject):
def __init__(self):
super(DummyClass, self).__init__()
print("Started the dummy")
def catch_dummy_signal(self, name, dictionary):
print(name)
print(dictionary)
print("I CAUGHT THE DUMMY SIGNAL")
class Server(QObject):
DUMMY_SIGNAL = Signal(str, dict)
def __init__(self):
super(Server, self).__init__()
def start_listening(self):
simple_server = wsgiref.simple_server.make_server("127.0.0.1", 65500, self.process_request)
print(simple_server.server_address)
while True:
simple_server.handle_request()
def process_request(self, environ, start_response):
print("Got request")
status = "200 OK"
headers = [("Content-type", "text/plain")]
start_response(status, headers)
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
request_body = environ['wsgi.input'].read(request_body_size)
query = json.loads(request_body)
print("Emitting dummy signal")
self.DUMMY_SIGNAL.emit("Luke Skywalker", {"occupation": "Jedi Knight"})
command_result = {"query": query}
return [json.dumps(command_result).encode()]
if __name__ == '__main__':
server = Server()
dummy = DummyClass()
server.DUMMY_SIGNAL.connect(dummy.catch_dummy_signal)
thread = QThread()
server.moveToThread(thread)
thread.started.connect(server.start_listening)
thread.start()
# running an endless loop here, otherwise the program ends. This endless loop will not be used when running it inside a program like Blender
while True:
pass
此客户端代码可用于调用向发出 DUMMY_SIGNAL 的服务器发出请求,以便 dummy 捕获:
import requests
url = "http://127.0.0.1:65500"
payload = {"data": "This is important data!"}
response = requests.post(url, json=payload).json()
print(response)
服务器的输出如下所示:
Started the dummy
('127.0.0.1', 65500)
got request
Emitting dummy signal
127.0.0.1 - - [01/Oct/2020 16:19:33] "POST / HTTP/1.1" 200 46
客户端的输出如下所示:
{'query': {'data': 'This is important data!'}}
从服务器的输出中可以看出,catch_dummy_signal
中的打印语句未显示。所以我必须假设该函数从未被调用过。
如何通过向 服务器 发出请求来触发 catch_dummy_signal
?
信号仅在存在 Qt 事件循环时才有效,因为它是传输方式,在您的情况下,使用 QCoreApplication 就足够了:
if __name__ == "__main__":
<b>app = QCoreApplication([])</b>
server = Server()
dummy = DummyClass()
server.DUMMY_SIGNAL.connect(dummy.catch_dummy_signal)
thread = QThread()
server.moveToThread(thread)
thread.started.connect(server.start_listening)
thread.start()
<b>app.exec_()</b>
我有 2 个 classes:
- 一台服务器
- 一个假人class
我需要能够从像 Blender 这样的程序中 运行 这个。由于 服务器 需要不断地监听任何请求,我需要 运行 它在一个单独的线程中,否则它会阻塞主线程 UI。 dummy 需要在主线程中 运行。
当 server 收到请求时,它需要发出一个信号,该信号将被 dummy 捕获。但是,无论我尝试什么,我似乎都无法捕捉到来自 server.
的信号这里有一些示例代码可以 运行 没有 Blender:
server 和 dummy classes
import wsgiref.simple_server
import json
from PySide2.QtCore import *
class DummyClass(QObject):
def __init__(self):
super(DummyClass, self).__init__()
print("Started the dummy")
def catch_dummy_signal(self, name, dictionary):
print(name)
print(dictionary)
print("I CAUGHT THE DUMMY SIGNAL")
class Server(QObject):
DUMMY_SIGNAL = Signal(str, dict)
def __init__(self):
super(Server, self).__init__()
def start_listening(self):
simple_server = wsgiref.simple_server.make_server("127.0.0.1", 65500, self.process_request)
print(simple_server.server_address)
while True:
simple_server.handle_request()
def process_request(self, environ, start_response):
print("Got request")
status = "200 OK"
headers = [("Content-type", "text/plain")]
start_response(status, headers)
request_body_size = int(environ.get('CONTENT_LENGTH', 0))
request_body = environ['wsgi.input'].read(request_body_size)
query = json.loads(request_body)
print("Emitting dummy signal")
self.DUMMY_SIGNAL.emit("Luke Skywalker", {"occupation": "Jedi Knight"})
command_result = {"query": query}
return [json.dumps(command_result).encode()]
if __name__ == '__main__':
server = Server()
dummy = DummyClass()
server.DUMMY_SIGNAL.connect(dummy.catch_dummy_signal)
thread = QThread()
server.moveToThread(thread)
thread.started.connect(server.start_listening)
thread.start()
# running an endless loop here, otherwise the program ends. This endless loop will not be used when running it inside a program like Blender
while True:
pass
此客户端代码可用于调用向发出 DUMMY_SIGNAL 的服务器发出请求,以便 dummy 捕获:
import requests
url = "http://127.0.0.1:65500"
payload = {"data": "This is important data!"}
response = requests.post(url, json=payload).json()
print(response)
服务器的输出如下所示:
Started the dummy
('127.0.0.1', 65500)
got request
Emitting dummy signal
127.0.0.1 - - [01/Oct/2020 16:19:33] "POST / HTTP/1.1" 200 46
客户端的输出如下所示:
{'query': {'data': 'This is important data!'}}
从服务器的输出中可以看出,catch_dummy_signal
中的打印语句未显示。所以我必须假设该函数从未被调用过。
如何通过向 服务器 发出请求来触发 catch_dummy_signal
?
信号仅在存在 Qt 事件循环时才有效,因为它是传输方式,在您的情况下,使用 QCoreApplication 就足够了:
if __name__ == "__main__":
<b>app = QCoreApplication([])</b>
server = Server()
dummy = DummyClass()
server.DUMMY_SIGNAL.connect(dummy.catch_dummy_signal)
thread = QThread()
server.moveToThread(thread)
thread.started.connect(server.start_listening)
thread.start()
<b>app.exec_()</b>