如何 运行 独立于异步循环的阻塞代码

How to run a blocking code independently from asyncio loop

我的项目要求我 运行 一个阻塞代码(来自另一个库),同时继续我的 asyncio while: true 循环。代码看起来像这样:

async def main():
    while True:
        session_timeout = aiohttp.ClientTimeout()
        async with aiohttp.ClientSession() as session:
            
            // Do async stuffs like session.get and so on
            
            # At a certain point, I have a blocking code that I need to execute
            
            // Blocking_code() starts here. The blocking code needs time to get the return value.
               Running blocking_code() is the last thing to do in my main() function.
            
            # My objective is to run the blocking code separately. 
            # Such that whilst the blocking_code() runs, I would like my loop to start from the beginning again,
            # and not having to wait until blocking_code() completes and returns.
            # In other words, go back to the top of the while loop.
            
            # Separately, the blocking_code() will continue to run independently, which would eventually complete
            # and returns. When it returns, nothing in main() will need the return value. Rather the returned
            # result continue to be used in blocking_code()

asyncio.run(main())

我试过使用 pool = ThreadPool(processes=1)thread = pool.apply_async(blocking_code, params)。如果在 main() 中的 blocking_code() 之后需要完成一些事情,它就可以工作;但是 blocking_code() 是 main() 中的最后一件事,它会导致整个 while 循环暂停,直到 blocking_code() 完成,然后再从顶部开始。

我不知道这是否可行,如果可行,它是如何实现的;但理想的场景是这样的。

运行 main(),然后 运行 blocking_code() 在它自己的实例中。就像执行另一个 .py 文件一样。因此,一旦循环到达 main() 中的 blocking_code(),它会触发 blocking_code.py 文件,同时 blocking_code.py 脚本 运行s,while 循环再次从顶部继续.

如果到while循环第2遍的时候,又到了blocking_code(),之前的运行还没有完成; blocking_code() 的另一个实例将 运行 在它自己的实例上,独立地。

我说的有道理吗?是否可以达到预期的效果?

谢谢!

线程可以做到这一点。所以你不会阻塞你的主循环,你需要将你的线程包装在一个 asyncio 任务中。如果需要,您可以在循环完成后等待 return 值。您可以结合使用 asyncio.create_taskasyncio.to_thread

import aiohttp
import asyncio
import time

def blocking_code():
    print('Starting blocking code.')
    time.sleep(5)
    print('Finished blocking code.')

async def main():
    blocking_code_tasks = []

    while True:
        session_timeout = aiohttp.ClientTimeout()
        async with aiohttp.ClientSession() as session:
        
            print('Executing GET.')
            result = await session.get('https://www.example.com')
        
            blocking_code_task = asyncio.create_task(asyncio.to_thread(blocking_code))
            blocking_code_tasks.append(blocking_code_task)

    #do something with blocking_code_tasks, wait for them to finish, extract errors, etc.

asyncio.run(main())

上面的代码在线程中运行阻塞代码,然后将其放入异步任务中。然后我们将其添加到 blocking_code_tasks 列表以跟踪所有当前 运行 任务。稍后,您可以使用 asyncio.gather

之类的东西来获取值或错误