python 与 AsyncKernelManager 异步且 Qt 未执行
python async with AsyncKernelManager and Qt not executing
我正在尝试在 Qt 应用程序的 jupyter 内核中执行代码。我有下面的代码片段应该异步 运行 代码然后打印结果
import sys
import asyncio
import qasync
from qasync import QApplication
from PySide6.QtWidgets import QWidget
from jupyter_client import AsyncKernelManager
CODE = """print('test')"""
class Test():
def __init__(self):
kernel_manager = AsyncKernelManager()
kernel_manager.start_kernel()
self.client = kernel_manager.client()
self.client.start_channels()
def run(self):
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.execute(), loop=loop)
async def execute(self):
self.client.execute(CODE)
response: Coroutine = self.client.get_shell_msg()
print('Before')
res = await response
print('After')
def main():
app = QApplication(sys.argv)
test = Test()
test.run()
sys.exit(app.exec())
main()
通过以上我得到以下输出
/tmp/test/test.py:16: RuntimeWarning: coroutine 'KernelManager._async_start_kernel' was never awaited
kernel_manager.start_kernel()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/tmp/test/test.py:22: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
所以尝试根据 qasync 中的示例调整代码
类似于
async def main():
app = QApplication(sys.argv)
test = Test()
test.run()
sys.exit(app.exec())
qasync.run(main())
会导致以下异常
Traceback (most recent call last):
File "/tmp/test/test.py", line 40, in <module>
qasync.run(main())
File "/tmp/test/.venv/lib/python3.10/site-packages/qasync/__init__.py", line 821, in run
return asyncio.run(*args, **kwargs)
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/tmp/test/.venv/lib/python3.10/site-packages/qasync/__init__.py", line 409, in run_until_complete
return future.result()
File "/tmp/test/test.py", line 34, in main
app = QApplication(sys.argv)
RuntimeError: Please destroy the QApplication singleton before creating a new QApplication instance.
我现在很迷茫,有人知道如何让它工作吗?
你必须创建一个QEventLoop,而且start_kernel必须使用await。另一方面,它首先导入 PySide6,然后导入依赖于 PySide6 的其他库,如 qasync,以便它可以推断出正确的 Qt 绑定。
import sys
import asyncio
from functools import cached_property
from PySide6.QtWidgets import QApplication
import qasync
from jupyter_client import AsyncKernelManager
CODE = """print('test')"""
class Test:
@cached_property
def kernel_manager(self):
return AsyncKernelManager()
@cached_property
def client(self):
return self.kernel_manager.client()
async def start(self):
await self.kernel_manager.start_kernel()
self.client.start_channels()
asyncio.ensure_future(self.execute())
async def execute(self):
self.client.execute(CODE)
response = self.client.get_shell_msg()
print("Before")
res = await response
print("After", res)
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
test = Test()
asyncio.ensure_future(test.start())
with loop:
loop.run_forever()
if __name__ == "__main__":
main()
我正在尝试在 Qt 应用程序的 jupyter 内核中执行代码。我有下面的代码片段应该异步 运行 代码然后打印结果
import sys
import asyncio
import qasync
from qasync import QApplication
from PySide6.QtWidgets import QWidget
from jupyter_client import AsyncKernelManager
CODE = """print('test')"""
class Test():
def __init__(self):
kernel_manager = AsyncKernelManager()
kernel_manager.start_kernel()
self.client = kernel_manager.client()
self.client.start_channels()
def run(self):
loop = asyncio.get_event_loop()
asyncio.ensure_future(self.execute(), loop=loop)
async def execute(self):
self.client.execute(CODE)
response: Coroutine = self.client.get_shell_msg()
print('Before')
res = await response
print('After')
def main():
app = QApplication(sys.argv)
test = Test()
test.run()
sys.exit(app.exec())
main()
通过以上我得到以下输出
/tmp/test/test.py:16: RuntimeWarning: coroutine 'KernelManager._async_start_kernel' was never awaited
kernel_manager.start_kernel()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
/tmp/test/test.py:22: DeprecationWarning: There is no current event loop
loop = asyncio.get_event_loop()
所以尝试根据 qasync 中的示例调整代码 类似于
async def main():
app = QApplication(sys.argv)
test = Test()
test.run()
sys.exit(app.exec())
qasync.run(main())
会导致以下异常
Traceback (most recent call last):
File "/tmp/test/test.py", line 40, in <module>
qasync.run(main())
File "/tmp/test/.venv/lib/python3.10/site-packages/qasync/__init__.py", line 821, in run
return asyncio.run(*args, **kwargs)
File "/usr/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/tmp/test/.venv/lib/python3.10/site-packages/qasync/__init__.py", line 409, in run_until_complete
return future.result()
File "/tmp/test/test.py", line 34, in main
app = QApplication(sys.argv)
RuntimeError: Please destroy the QApplication singleton before creating a new QApplication instance.
我现在很迷茫,有人知道如何让它工作吗?
你必须创建一个QEventLoop,而且start_kernel必须使用await。另一方面,它首先导入 PySide6,然后导入依赖于 PySide6 的其他库,如 qasync,以便它可以推断出正确的 Qt 绑定。
import sys
import asyncio
from functools import cached_property
from PySide6.QtWidgets import QApplication
import qasync
from jupyter_client import AsyncKernelManager
CODE = """print('test')"""
class Test:
@cached_property
def kernel_manager(self):
return AsyncKernelManager()
@cached_property
def client(self):
return self.kernel_manager.client()
async def start(self):
await self.kernel_manager.start_kernel()
self.client.start_channels()
asyncio.ensure_future(self.execute())
async def execute(self):
self.client.execute(CODE)
response = self.client.get_shell_msg()
print("Before")
res = await response
print("After", res)
def main():
app = QApplication(sys.argv)
loop = qasync.QEventLoop(app)
asyncio.set_event_loop(loop)
test = Test()
asyncio.ensure_future(test.start())
with loop:
loop.run_forever()
if __name__ == "__main__":
main()