在 PyQt 应用程序中嵌入 aiohttp 服务器
embed a aiohttp server in a PyQt application
我打算在 PyQt 应用程序中嵌入一个 aiohttp 服务器,但是当我 运行 下面的代码时,Qt window 无法显示,我知道这是由 web.run_app(app)
,我试图将它移动到一个线程中,但后来我得到了 RuntimeError: There is no current event loop in thread 'Dummy-1'
,那我该怎么办?我发现 asyncqt 这可能有帮助,但我不知道如何使用它来处理 aiohttp 服务器。
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from aiohttp import web
class ThreadGo(QThread): # threading.Thread
# implementing new slots in a QThread subclass is error-prone and discouraged.
def __init__(self, parent, func, *args, **kwargs):
super().__init__(parent)
self.func = func
self.args = args
self.kwargs = kwargs
self.result = 0
onFinished = self.kwargs.get('onFinished')
self.finished.connect(onFinished) if onFinished else None # 用lambda还不行呢
self.finished.connect(self.deleteLater)
self.start()
def run(self):
self.result = self.func(*self.args) # deleteLater
class Window(QMainWindow):
def __init__(self, parent=None, **kwargs):
super().__init__(parent, **kwargs)
self.setUpHTTPServer()
def setUpHTTPServer(self):
async def hello(request):
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes([web.get('/', hello)])
web.run_app(app)
# ThreadGo(self, lambda:web.run_app(app))#get RuntimeError: There is no current event loop in thread 'Dummy-1
if __name__ == "__main__":
from sys import argv, exit
a = QApplication(argv)
w = Window()
w.show()
exit(a.exec_())
在下面的示例中,我展示了如何将 Qt 与 aiohttp 服务器一起使用:
import asyncio
from functools import cached_property
from PyQt5.QtWidgets import QApplication, QMainWindow
from asyncqt import QEventLoop
from aiohttp import web
class Window(QMainWindow):
def __init__(self, parent=None, **kwargs):
super().__init__(parent, **kwargs)
self.setup_server()
def setup_server(self):
self.app.add_routes([web.get("/", self.hello)])
@cached_property
def app(self):
return web.Application()
async def hello(self, request):
return web.Response(text="Hello, world")
def run(self):
web.run_app(self.app)
def main():
import sys
a = QApplication(sys.argv)
loop = QEventLoop(a)
asyncio.set_event_loop(loop)
w = Window()
w.show()
w.run()
if __name__ == "__main__":
main()
我打算在 PyQt 应用程序中嵌入一个 aiohttp 服务器,但是当我 运行 下面的代码时,Qt window 无法显示,我知道这是由 web.run_app(app)
,我试图将它移动到一个线程中,但后来我得到了 RuntimeError: There is no current event loop in thread 'Dummy-1'
,那我该怎么办?我发现 asyncqt 这可能有帮助,但我不知道如何使用它来处理 aiohttp 服务器。
from PyQt5.QtCore import *
from PyQt5.QtGui import *
from PyQt5.QtWidgets import *
from aiohttp import web
class ThreadGo(QThread): # threading.Thread
# implementing new slots in a QThread subclass is error-prone and discouraged.
def __init__(self, parent, func, *args, **kwargs):
super().__init__(parent)
self.func = func
self.args = args
self.kwargs = kwargs
self.result = 0
onFinished = self.kwargs.get('onFinished')
self.finished.connect(onFinished) if onFinished else None # 用lambda还不行呢
self.finished.connect(self.deleteLater)
self.start()
def run(self):
self.result = self.func(*self.args) # deleteLater
class Window(QMainWindow):
def __init__(self, parent=None, **kwargs):
super().__init__(parent, **kwargs)
self.setUpHTTPServer()
def setUpHTTPServer(self):
async def hello(request):
return web.Response(text="Hello, world")
app = web.Application()
app.add_routes([web.get('/', hello)])
web.run_app(app)
# ThreadGo(self, lambda:web.run_app(app))#get RuntimeError: There is no current event loop in thread 'Dummy-1
if __name__ == "__main__":
from sys import argv, exit
a = QApplication(argv)
w = Window()
w.show()
exit(a.exec_())
在下面的示例中,我展示了如何将 Qt 与 aiohttp 服务器一起使用:
import asyncio
from functools import cached_property
from PyQt5.QtWidgets import QApplication, QMainWindow
from asyncqt import QEventLoop
from aiohttp import web
class Window(QMainWindow):
def __init__(self, parent=None, **kwargs):
super().__init__(parent, **kwargs)
self.setup_server()
def setup_server(self):
self.app.add_routes([web.get("/", self.hello)])
@cached_property
def app(self):
return web.Application()
async def hello(self, request):
return web.Response(text="Hello, world")
def run(self):
web.run_app(self.app)
def main():
import sys
a = QApplication(sys.argv)
loop = QEventLoop(a)
asyncio.set_event_loop(loop)
w = Window()
w.show()
w.run()
if __name__ == "__main__":
main()