如何在 QThread 中捕获来自服务器 运行 的信号

How to catch a signal from server running in a QThread

我有 2 个 classes:

我需要能够从像 Blender 这样的程序中 运行 这个。由于 服务器 需要不断地监听任何请求,我需要 运行 它在一个单独的线程中,否则它会阻塞主线程 UI。 dummy 需要在主线程中 运行。

server 收到请求时,它需要发出一个信号,该信号将被 dummy 捕获。但是,无论我尝试什么,我似乎都无法捕捉到来自 server.

的信号

这里有一些示例代码可以 运行 没有 Blender:

serverdummy classes

import wsgiref.simple_server
import json
from PySide2.QtCore import *

class DummyClass(QObject):
    def __init__(self):
        super(DummyClass, self).__init__()
        print("Started the dummy")

    def catch_dummy_signal(self, name, dictionary):
        print(name)
        print(dictionary)
        print("I CAUGHT THE DUMMY SIGNAL")


class Server(QObject):
    DUMMY_SIGNAL = Signal(str, dict)
    def __init__(self):
        super(Server, self).__init__()

    def start_listening(self):
        simple_server = wsgiref.simple_server.make_server("127.0.0.1", 65500, self.process_request)
        print(simple_server.server_address)
        while True:
            simple_server.handle_request()

    def process_request(self, environ, start_response):
        print("Got request")
        status = "200 OK"
        headers = [("Content-type", "text/plain")]
        start_response(status, headers)
        request_body_size = int(environ.get('CONTENT_LENGTH', 0))
        request_body = environ['wsgi.input'].read(request_body_size)
        query = json.loads(request_body)

        print("Emitting dummy signal")
        self.DUMMY_SIGNAL.emit("Luke Skywalker", {"occupation": "Jedi Knight"})

        command_result = {"query": query}
        return [json.dumps(command_result).encode()]

if __name__ == '__main__':
    server = Server()
    dummy = DummyClass()

    server.DUMMY_SIGNAL.connect(dummy.catch_dummy_signal)

    thread = QThread()
    server.moveToThread(thread)
    thread.started.connect(server.start_listening)
    thread.start()


    # running an endless loop here, otherwise the program ends. This endless loop will not be used when running it inside a program like Blender
    while True:
        pass

此客户端代码可用于调用向发出 DUMMY_SIGNAL 的服务器发出请求,以便 dummy 捕获:

import requests
url = "http://127.0.0.1:65500"
payload = {"data": "This is important data!"}

response = requests.post(url, json=payload).json()
print(response)

服务器的输出如下所示:

Started the dummy
('127.0.0.1', 65500)
got request
Emitting dummy signal
127.0.0.1 - - [01/Oct/2020 16:19:33] "POST / HTTP/1.1" 200 46

客户端的输出如下所示:

{'query': {'data': 'This is important data!'}}

从服务器的输出中可以看出,catch_dummy_signal 中的打印语句未显示。所以我必须假设该函数从未被调用过。

如何通过向 服务器 发出请求来触发 catch_dummy_signal

信号仅在存在 Qt 事件循环时才有效,因为它是传输方式,在您的情况下,使用 QCoreApplication 就足够了:

if __name__ == "__main__":

    <b>app = QCoreApplication([])</b>
    server = Server()
    dummy = DummyClass()

    server.DUMMY_SIGNAL.connect(dummy.catch_dummy_signal)

    thread = QThread()
    server.moveToThread(thread)
    thread.started.connect(server.start_listening)
    thread.start()

    <b>app.exec_()</b>