Python aiohttp + asyncio:如何在 loop.run_forever() 之后执行代码
Python aiohttp + asyncio: How to execute code after loop.run_forever()
代码:
#!/usr/bin/env python
import asyncio
import os
import socket
import time
import traceback
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)
def mk_socket(host="127.0.0.1", port=9090, reuseport=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuseport:
SO_REUSEPORT = 15
sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
sock.bind((host, port))
return sock
async def index(request):
icecast_index_path = os.path.abspath("../test/icecast/icecast_index.html")
print(icecast_index_path)
try:
content = open(icecast_index_path, encoding="utf8").read()
return web.Response(content_type="text/html", text=content)
except Exception as e:
return web.Response(content_type="text/html", text="<!doctype html><body><h1>Error: "+str(e)+"</h1></body></html>")
async def start_server():
try:
host = "127.0.0.1"
port=8080
reuseport = True
app = web.Application()
app.add_routes([web.get('/', index)])
runner = web.AppRunner(app)
await runner.setup()
sock = mk_socket(host, port, reuseport=reuseport)
srv = web.SockSite(runner, sock)
await srv.start()
print('Server started at http://127.0.0.1:8080')
return srv, app, runner
except Exception:
traceback.print_exc()
raise
async def finalize(srv, app, runner):
sock = srv.sockets[0]
app.loop.remove_reader(sock.fileno())
sock.close()
#await handler.finish_connections(1.0)
await runner.cleanup()
srv.close()
await srv.wait_closed()
await app.finish()
def init():
loop = asyncio.get_event_loop()
srv, app, runner = loop.run_until_complete(start_server())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete((finalize(srv, app, runner)))
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
for i in range(0, int(CPU_COUNT/2)):
executor.submit(init)
#after the aiohttp start i want to execute more code
#for example:
print("Hello world.")
#in actual programm the ProcessPoolExecutor is called
#inside a pyqt5 app
#so i don't want the pyqt5 app to freeze.
问题是使用该代码我无法在 ProcessPoolExecutor
调用后执行代码。
我该如何解决?
我试图删除这部分:
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete((finalize(srv, app, runner)))
在 init()
方法中,但之后 aiohttp 服务器立即关闭。
编辑: 如果我使用线程而不是 ProcessPoolExecutor
,则会出现 aiohttp
错误:
RuntimeError: set_wakeup_fd only works in main thread
RuntimeError: There is no current event loop in thread
- Aiohttp + asyncio 相关错误。
也许我可以使用 @
签名最多 def
声明(我想)。
将 with
与执行器一起使用将导致您的进程阻塞,直到执行器中的作业完成;因为它们是 运行 无限事件循环,它们永远不会完成,你的执行器也永远不会解除阻塞。
相反,只需使用执行程序启动作业,然后 运行 处理您的工作。最终完成后,调用 .shutdown()
等待进程退出:
executor = ProcessPoolExecutor()
for i in range(0, int(CPU_COUNT/2)):
executor.submit(init)
# other code…
executor.shutdown()
代码:
#!/usr/bin/env python
import asyncio
import os
import socket
import time
import traceback
from aiohttp import web
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
CPU_COUNT = cpu_count()
print("CPU Count:", CPU_COUNT)
def mk_socket(host="127.0.0.1", port=9090, reuseport=False):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if reuseport:
SO_REUSEPORT = 15
sock.setsockopt(socket.SOL_SOCKET, SO_REUSEPORT, 1)
sock.bind((host, port))
return sock
async def index(request):
icecast_index_path = os.path.abspath("../test/icecast/icecast_index.html")
print(icecast_index_path)
try:
content = open(icecast_index_path, encoding="utf8").read()
return web.Response(content_type="text/html", text=content)
except Exception as e:
return web.Response(content_type="text/html", text="<!doctype html><body><h1>Error: "+str(e)+"</h1></body></html>")
async def start_server():
try:
host = "127.0.0.1"
port=8080
reuseport = True
app = web.Application()
app.add_routes([web.get('/', index)])
runner = web.AppRunner(app)
await runner.setup()
sock = mk_socket(host, port, reuseport=reuseport)
srv = web.SockSite(runner, sock)
await srv.start()
print('Server started at http://127.0.0.1:8080')
return srv, app, runner
except Exception:
traceback.print_exc()
raise
async def finalize(srv, app, runner):
sock = srv.sockets[0]
app.loop.remove_reader(sock.fileno())
sock.close()
#await handler.finish_connections(1.0)
await runner.cleanup()
srv.close()
await srv.wait_closed()
await app.finish()
def init():
loop = asyncio.get_event_loop()
srv, app, runner = loop.run_until_complete(start_server())
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete((finalize(srv, app, runner)))
if __name__ == '__main__':
with ProcessPoolExecutor() as executor:
for i in range(0, int(CPU_COUNT/2)):
executor.submit(init)
#after the aiohttp start i want to execute more code
#for example:
print("Hello world.")
#in actual programm the ProcessPoolExecutor is called
#inside a pyqt5 app
#so i don't want the pyqt5 app to freeze.
问题是使用该代码我无法在 ProcessPoolExecutor
调用后执行代码。
我该如何解决?
我试图删除这部分:
try:
loop.run_forever()
except KeyboardInterrupt:
loop.run_until_complete((finalize(srv, app, runner)))
在 init()
方法中,但之后 aiohttp 服务器立即关闭。
编辑: 如果我使用线程而不是 ProcessPoolExecutor
,则会出现 aiohttp
错误:
RuntimeError: set_wakeup_fd only works in main thread
RuntimeError: There is no current event loop in thread
- Aiohttp + asyncio 相关错误。
也许我可以使用 @
签名最多 def
声明(我想)。
将 with
与执行器一起使用将导致您的进程阻塞,直到执行器中的作业完成;因为它们是 运行 无限事件循环,它们永远不会完成,你的执行器也永远不会解除阻塞。
相反,只需使用执行程序启动作业,然后 运行 处理您的工作。最终完成后,调用 .shutdown()
等待进程退出:
executor = ProcessPoolExecutor()
for i in range(0, int(CPU_COUNT/2)):
executor.submit(init)
# other code…
executor.shutdown()