如何使用多处理在新进程中调用异步函数?

How do I call an async function in a new process using multiprocessing?

我创建了一个等待 webhook 信号的服务器,当有信号时,它会创建一个新进程到 运行 loop() 函数,当 运行ning loop() 函数,我想让它异步调用函数 printmessage(),所以它会 运行 循环函数的下一行而不等待 printmessage() 函数完成处理,但是我出现以下错误,我该如何解决?

#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio


async def printmessage(fruit):
    print(fruit)
    time.sleep(5) 
    
async def loop(fruit):
    while True:     
        task = asyncio.create_task(printmessage(fruit))          
        time.sleep(1)

fruit="apple"
if __name__ == '__main__':

    print("PROGRAM LAUNCH...")
    print("WEBHOOK RECEIVE READY...")   

 
app = FastAPI()    
@app.post("/webhook")
async def webhook(request : Request):       

    print("WEBHOOK RECEIVED")    
    p = mp.Process(target=loop,args=[fruit])
    p.start() 
    print('done')   
    return 'WEBHOOK RECEIVED' 

预期的输出应该是每 1 秒打印 apple

错误:

RuntimeWarning: coroutine 'loop' was never awaited
  self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

我尝试了以下方法来避免错误,但根本没有输出:

#main.py
import time
from fastapi import Request, FastAPI
import multiprocessing as mp
import uvicorn
import asyncio


async def printmessage(fruit):
    print(fruit)
    time.sleep(5) 
    
async def loop(fruit):
    while True:     
        task = asyncio.create_task(printmessage(fruit))          
        time.sleep(1)

def preloop(fruit):
    asyncio.run(loop(fruit))

fruit="apple"
if __name__ == '__main__':

    print("PROGRAM LAUNCH...")
    print("WEBHOOK RECEIVE READY...")  

 
app = FastAPI()    
@app.post("/webhook")
async def webhook(request : Request):       

    print("WEBHOOK RECEIVED")    
    p = mp.Process(target=preloop,args=[fruit])
    p.start() 
    print('done')   
    return 'WEBHOOK RECEIVED' 

以下是如何使用多处理在新进程中调用异步函数。 在此代码中,每个对 /webhook 的请求都会创建一个新进程,每 5 秒打印一次 apple

from __future__ import annotations
import asyncio
from multiprocessing import Process
from fastapi import FastAPI

app = FastAPI()

process_pool: list[Process] = []


async def print_message(fruit):
    print(fruit)


async def loop(fruit):
    while True:
        await print_message(fruit)
        await asyncio.sleep(5)


def run_loop(fruit):
    asyncio.run(loop(fruit))


@app.get("/webhook")
async def webhook():
    print("WEBHOOK RECEIVED")
    fruit = "apple"
    process = Process(target=run_loop, args=(fruit,))
    process_pool.append(process)
    process.start()
    print('done')
    return 'WEBHOOK RECEIVED'


@app.on_event("shutdown")
async def shutdown_event():
    for process in process_pool:
        process.kill()
    for process in process_pool:
        while process.is_alive():
            continue
        process.close()


if __name__ == '__main__':
    print("PROGRAM LAUNCH...")
    print("WEBHOOK RECEIVE READY...")