如何 运行 独立于异步循环的阻塞代码
How to run a blocking code independently from asyncio loop
我的项目要求我 运行 一个阻塞代码(来自另一个库),同时继续我的 asyncio while: true 循环。代码看起来像这样:
async def main():
while True:
session_timeout = aiohttp.ClientTimeout()
async with aiohttp.ClientSession() as session:
// Do async stuffs like session.get and so on
# At a certain point, I have a blocking code that I need to execute
// Blocking_code() starts here. The blocking code needs time to get the return value.
Running blocking_code() is the last thing to do in my main() function.
# My objective is to run the blocking code separately.
# Such that whilst the blocking_code() runs, I would like my loop to start from the beginning again,
# and not having to wait until blocking_code() completes and returns.
# In other words, go back to the top of the while loop.
# Separately, the blocking_code() will continue to run independently, which would eventually complete
# and returns. When it returns, nothing in main() will need the return value. Rather the returned
# result continue to be used in blocking_code()
asyncio.run(main())
我试过使用 pool = ThreadPool(processes=1)
和 thread = pool.apply_async(blocking_code, params)
。如果在 main() 中的 blocking_code() 之后需要完成一些事情,它就可以工作;但是 blocking_code() 是 main() 中的最后一件事,它会导致整个 while 循环暂停,直到 blocking_code() 完成,然后再从顶部开始。
我不知道这是否可行,如果可行,它是如何实现的;但理想的场景是这样的。
运行 main(),然后 运行 blocking_code() 在它自己的实例中。就像执行另一个 .py 文件一样。因此,一旦循环到达 main() 中的 blocking_code(),它会触发 blocking_code.py 文件,同时 blocking_code.py 脚本 运行s,while 循环再次从顶部继续.
如果到while循环第2遍的时候,又到了blocking_code(),之前的运行还没有完成; blocking_code() 的另一个实例将 运行 在它自己的实例上,独立地。
我说的有道理吗?是否可以达到预期的效果?
谢谢!
线程可以做到这一点。所以你不会阻塞你的主循环,你需要将你的线程包装在一个 asyncio 任务中。如果需要,您可以在循环完成后等待 return 值。您可以结合使用 asyncio.create_task
和 asyncio.to_thread
import aiohttp
import asyncio
import time
def blocking_code():
print('Starting blocking code.')
time.sleep(5)
print('Finished blocking code.')
async def main():
blocking_code_tasks = []
while True:
session_timeout = aiohttp.ClientTimeout()
async with aiohttp.ClientSession() as session:
print('Executing GET.')
result = await session.get('https://www.example.com')
blocking_code_task = asyncio.create_task(asyncio.to_thread(blocking_code))
blocking_code_tasks.append(blocking_code_task)
#do something with blocking_code_tasks, wait for them to finish, extract errors, etc.
asyncio.run(main())
上面的代码在线程中运行阻塞代码,然后将其放入异步任务中。然后我们将其添加到 blocking_code_tasks
列表以跟踪所有当前 运行 任务。稍后,您可以使用 asyncio.gather
之类的东西来获取值或错误
我的项目要求我 运行 一个阻塞代码(来自另一个库),同时继续我的 asyncio while: true 循环。代码看起来像这样:
async def main():
while True:
session_timeout = aiohttp.ClientTimeout()
async with aiohttp.ClientSession() as session:
// Do async stuffs like session.get and so on
# At a certain point, I have a blocking code that I need to execute
// Blocking_code() starts here. The blocking code needs time to get the return value.
Running blocking_code() is the last thing to do in my main() function.
# My objective is to run the blocking code separately.
# Such that whilst the blocking_code() runs, I would like my loop to start from the beginning again,
# and not having to wait until blocking_code() completes and returns.
# In other words, go back to the top of the while loop.
# Separately, the blocking_code() will continue to run independently, which would eventually complete
# and returns. When it returns, nothing in main() will need the return value. Rather the returned
# result continue to be used in blocking_code()
asyncio.run(main())
我试过使用 pool = ThreadPool(processes=1)
和 thread = pool.apply_async(blocking_code, params)
。如果在 main() 中的 blocking_code() 之后需要完成一些事情,它就可以工作;但是 blocking_code() 是 main() 中的最后一件事,它会导致整个 while 循环暂停,直到 blocking_code() 完成,然后再从顶部开始。
我不知道这是否可行,如果可行,它是如何实现的;但理想的场景是这样的。
运行 main(),然后 运行 blocking_code() 在它自己的实例中。就像执行另一个 .py 文件一样。因此,一旦循环到达 main() 中的 blocking_code(),它会触发 blocking_code.py 文件,同时 blocking_code.py 脚本 运行s,while 循环再次从顶部继续.
如果到while循环第2遍的时候,又到了blocking_code(),之前的运行还没有完成; blocking_code() 的另一个实例将 运行 在它自己的实例上,独立地。
我说的有道理吗?是否可以达到预期的效果?
谢谢!
线程可以做到这一点。所以你不会阻塞你的主循环,你需要将你的线程包装在一个 asyncio 任务中。如果需要,您可以在循环完成后等待 return 值。您可以结合使用 asyncio.create_task
和 asyncio.to_thread
import aiohttp
import asyncio
import time
def blocking_code():
print('Starting blocking code.')
time.sleep(5)
print('Finished blocking code.')
async def main():
blocking_code_tasks = []
while True:
session_timeout = aiohttp.ClientTimeout()
async with aiohttp.ClientSession() as session:
print('Executing GET.')
result = await session.get('https://www.example.com')
blocking_code_task = asyncio.create_task(asyncio.to_thread(blocking_code))
blocking_code_tasks.append(blocking_code_task)
#do something with blocking_code_tasks, wait for them to finish, extract errors, etc.
asyncio.run(main())
上面的代码在线程中运行阻塞代码,然后将其放入异步任务中。然后我们将其添加到 blocking_code_tasks
列表以跟踪所有当前 运行 任务。稍后,您可以使用 asyncio.gather