如何使用多处理在新进程中调用异步函数?
How do I call an async function in a new process using multiprocessing?
我创建了一个等待 webhook 信号的服务器,当有信号时,它会创建一个新进程到 运行 loop()
函数,当 运行ning loop()
函数,我想让它异步调用函数 printmessage()
,所以它会 运行 循环函数的下一行而不等待 printmessage()
函数完成处理,但是我出现以下错误,我该如何解决?
#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio
async def printmessage(fruit):
print(fruit)
time.sleep(5)
async def loop(fruit):
while True:
task = asyncio.create_task(printmessage(fruit))
time.sleep(1)
fruit="apple"
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")
app = FastAPI()
@app.post("/webhook")
async def webhook(request : Request):
print("WEBHOOK RECEIVED")
p = mp.Process(target=loop,args=[fruit])
p.start()
print('done')
return 'WEBHOOK RECEIVED'
预期的输出应该是每 1 秒打印 apple
。
错误:
RuntimeWarning: coroutine 'loop' was never awaited
self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
我尝试了以下方法来避免错误,但根本没有输出:
#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio
async def printmessage(fruit):
print(fruit)
time.sleep(5)
async def loop(fruit):
while True:
task = asyncio.create_task(printmessage(fruit))
time.sleep(1)
def preloop(fruit):
asyncio.run(loop(fruit))
fruit="apple"
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")
app = FastAPI()
@app.post("/webhook")
async def webhook(request : Request):
print("WEBHOOK RECEIVED")
p = mp.Process(target=preloop,args=[fruit])
p.start()
print('done')
return 'WEBHOOK RECEIVED'
以下是如何使用多处理在新进程中调用异步函数。
在此代码中,每个对 /webhook 的请求都会创建一个新进程,每 5 秒打印一次 apple。
from __future__ import annotations
import asyncio
from multiprocessing import Process
from fastapi import FastAPI
app = FastAPI()
process_pool: list[Process] = []
async def print_message(fruit):
print(fruit)
async def loop(fruit):
while True:
await print_message(fruit)
await asyncio.sleep(5)
def run_loop(fruit):
asyncio.run(loop(fruit))
@app.get("/webhook")
async def webhook():
print("WEBHOOK RECEIVED")
fruit = "apple"
process = Process(target=run_loop, args=(fruit,))
process_pool.append(process)
process.start()
print('done')
return 'WEBHOOK RECEIVED'
@app.on_event("shutdown")
async def shutdown_event():
for process in process_pool:
process.kill()
for process in process_pool:
while process.is_alive():
continue
process.close()
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")
我创建了一个等待 webhook 信号的服务器,当有信号时,它会创建一个新进程到 运行 loop()
函数,当 运行ning loop()
函数,我想让它异步调用函数 printmessage()
,所以它会 运行 循环函数的下一行而不等待 printmessage()
函数完成处理,但是我出现以下错误,我该如何解决?
#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio
async def printmessage(fruit):
print(fruit)
time.sleep(5)
async def loop(fruit):
while True:
task = asyncio.create_task(printmessage(fruit))
time.sleep(1)
fruit="apple"
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")
app = FastAPI()
@app.post("/webhook")
async def webhook(request : Request):
print("WEBHOOK RECEIVED")
p = mp.Process(target=loop,args=[fruit])
p.start()
print('done')
return 'WEBHOOK RECEIVED'
预期的输出应该是每 1 秒打印 apple
。
错误:
RuntimeWarning: coroutine 'loop' was never awaited
self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
我尝试了以下方法来避免错误,但根本没有输出:
#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio
async def printmessage(fruit):
print(fruit)
time.sleep(5)
async def loop(fruit):
while True:
task = asyncio.create_task(printmessage(fruit))
time.sleep(1)
def preloop(fruit):
asyncio.run(loop(fruit))
fruit="apple"
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")
app = FastAPI()
@app.post("/webhook")
async def webhook(request : Request):
print("WEBHOOK RECEIVED")
p = mp.Process(target=preloop,args=[fruit])
p.start()
print('done')
return 'WEBHOOK RECEIVED'
以下是如何使用多处理在新进程中调用异步函数。 在此代码中,每个对 /webhook 的请求都会创建一个新进程,每 5 秒打印一次 apple。
from __future__ import annotations
import asyncio
from multiprocessing import Process
from fastapi import FastAPI
app = FastAPI()
process_pool: list[Process] = []
async def print_message(fruit):
print(fruit)
async def loop(fruit):
while True:
await print_message(fruit)
await asyncio.sleep(5)
def run_loop(fruit):
asyncio.run(loop(fruit))
@app.get("/webhook")
async def webhook():
print("WEBHOOK RECEIVED")
fruit = "apple"
process = Process(target=run_loop, args=(fruit,))
process_pool.append(process)
process.start()
print('done')
return 'WEBHOOK RECEIVED'
@app.on_event("shutdown")
async def shutdown_event():
for process in process_pool:
process.kill()
for process in process_pool:
while process.is_alive():
continue
process.close()
if __name__ == '__main__':
print("PROGRAM LAUNCH...")
print("WEBHOOK RECEIVE READY...")