如何将 python asyncio 与线程相结合?
How to combine python asyncio with threads?
我已经使用 Python asyncio 和 aiohttp 成功构建了一个 RESTful microservice,它监听 POST 事件以从各种馈线收集实时事件。
然后它构建一个内存结构以在嵌套的 defaultdict/deque 结构中缓存最后 24 小时的事件。
现在我想定期检查该结构到磁盘,最好使用 pickle。
由于内存结构可能大于 100MB,因此我想避免在检查结构点所需的时间内阻止传入事件处理。
我宁愿创建该结构的快照副本(例如 deepcopy),然后花时间将其写入磁盘并按预设时间间隔重复。
我一直在寻找关于如何结合线程(线程甚至是最好的解决方案吗?)和 asyncio 的示例,但找不到对我有帮助的东西。
非常感谢任何入门指南!
使用 BaseEventLoop.run_in_executor
:
将方法委托给线程或子进程非常简单
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound
@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won't block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
至于是用ProcessPoolExecutor
还是ThreadPoolExecutor
,就不好说了;酸洗一个大对象肯定会消耗一些 CPU 周期,这最初会让你认为 ProcessPoolExecutor
是要走的路。但是,将您的 100MB 对象传递给池中的 Process
需要在主进程中对实例进行 pickle,通过 IPC 将字节发送到子进程,在子进程中对其进行 unpickling,然后对其进行 pickle 再次这样你就可以把它写入磁盘了。鉴于此,我的猜测是 pickling/unpickling 开销将足够大,你最好使用 ThreadPoolExecutor
,即使你会因为 GIL 而受到性能影响。
话虽如此,两种方式都进行测试并确定答案非常简单,所以您不妨这样做。
我也使用了 run_in_executor
,但我发现这个函数在大多数情况下有点恶心,因为它需要 partial()
作为关键字参数,而且除了单个执行程序之外,我从来没有用任何东西调用它和默认的事件循环。所以我用合理的默认值和自动关键字参数处理围绕它做了一个方便的包装器。
from time import sleep
import asyncio as aio
loop = aio.get_event_loop()
class Executor:
"""In most cases, you can just use the 'execute' instance as a
function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
the executor, assign result to y. The defaults can be changed, though,
with your own instantiation of Executor, i.e. execute =
Executor(nthreads=4)"""
def __init__(self, loop=loop, nthreads=1):
from concurrent.futures import ThreadPoolExecutor
self._ex = ThreadPoolExecutor(nthreads)
self._loop = loop
def __call__(self, f, *args, **kw):
from functools import partial
return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()
...
def cpu_bound_operation(t, alpha=30):
sleep(t)
return 20*alpha
async def main():
y = await execute(cpu_bound_operation, 5, alpha=-2)
loop.run_until_complete(main())
另一种选择是使用loop.call_soon_threadsafe
along with an asyncio.Queue
作为沟通的中间渠道。
Python 3 的当前文档也有关于 Developing with asyncio - Concurrency and Multithreading 的部分:
import asyncio
# This method represents your blocking code
def blocking(loop, queue):
import time
while True:
loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
time.sleep(2)
loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
time.sleep(2)
# This method represents your async code
async def nonblocking(queue):
await asyncio.sleep(1)
while True:
queue.put_nowait('Non-blocking A')
await asyncio.sleep(2)
queue.put_nowait('Non-blocking B')
await asyncio.sleep(2)
# The main sets up the queue as the communication channel and synchronizes them
async def main():
queue = asyncio.Queue()
loop = asyncio.get_running_loop()
blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
nonblocking_task = loop.create_task(nonblocking(queue))
running = True # use whatever exit condition
while running:
# Get messages from both blocking and non-blocking in parallel
message = await queue.get()
# You could send any messages, and do anything you want with them
print(message)
asyncio.run(main())
如何也可能对你有帮助。
我已经使用 Python asyncio 和 aiohttp 成功构建了一个 RESTful microservice,它监听 POST 事件以从各种馈线收集实时事件。
然后它构建一个内存结构以在嵌套的 defaultdict/deque 结构中缓存最后 24 小时的事件。
现在我想定期检查该结构到磁盘,最好使用 pickle。
由于内存结构可能大于 100MB,因此我想避免在检查结构点所需的时间内阻止传入事件处理。
我宁愿创建该结构的快照副本(例如 deepcopy),然后花时间将其写入磁盘并按预设时间间隔重复。
我一直在寻找关于如何结合线程(线程甚至是最好的解决方案吗?)和 asyncio 的示例,但找不到对我有帮助的东西。
非常感谢任何入门指南!
使用 BaseEventLoop.run_in_executor
:
import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
def cpu_bound_operation(x):
time.sleep(x) # This is some operation that is CPU-bound
@asyncio.coroutine
def main():
# Run cpu_bound_operation in the ProcessPoolExecutor
# This will make your coroutine block, but won't block
# the event loop; other coroutines can run in meantime.
yield from loop.run_in_executor(p, cpu_bound_operation, 5)
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())
至于是用ProcessPoolExecutor
还是ThreadPoolExecutor
,就不好说了;酸洗一个大对象肯定会消耗一些 CPU 周期,这最初会让你认为 ProcessPoolExecutor
是要走的路。但是,将您的 100MB 对象传递给池中的 Process
需要在主进程中对实例进行 pickle,通过 IPC 将字节发送到子进程,在子进程中对其进行 unpickling,然后对其进行 pickle 再次这样你就可以把它写入磁盘了。鉴于此,我的猜测是 pickling/unpickling 开销将足够大,你最好使用 ThreadPoolExecutor
,即使你会因为 GIL 而受到性能影响。
话虽如此,两种方式都进行测试并确定答案非常简单,所以您不妨这样做。
我也使用了 run_in_executor
,但我发现这个函数在大多数情况下有点恶心,因为它需要 partial()
作为关键字参数,而且除了单个执行程序之外,我从来没有用任何东西调用它和默认的事件循环。所以我用合理的默认值和自动关键字参数处理围绕它做了一个方便的包装器。
from time import sleep
import asyncio as aio
loop = aio.get_event_loop()
class Executor:
"""In most cases, you can just use the 'execute' instance as a
function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
the executor, assign result to y. The defaults can be changed, though,
with your own instantiation of Executor, i.e. execute =
Executor(nthreads=4)"""
def __init__(self, loop=loop, nthreads=1):
from concurrent.futures import ThreadPoolExecutor
self._ex = ThreadPoolExecutor(nthreads)
self._loop = loop
def __call__(self, f, *args, **kw):
from functools import partial
return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()
...
def cpu_bound_operation(t, alpha=30):
sleep(t)
return 20*alpha
async def main():
y = await execute(cpu_bound_operation, 5, alpha=-2)
loop.run_until_complete(main())
另一种选择是使用loop.call_soon_threadsafe
along with an asyncio.Queue
作为沟通的中间渠道。
Python 3 的当前文档也有关于 Developing with asyncio - Concurrency and Multithreading 的部分:
import asyncio
# This method represents your blocking code
def blocking(loop, queue):
import time
while True:
loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
time.sleep(2)
loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
time.sleep(2)
# This method represents your async code
async def nonblocking(queue):
await asyncio.sleep(1)
while True:
queue.put_nowait('Non-blocking A')
await asyncio.sleep(2)
queue.put_nowait('Non-blocking B')
await asyncio.sleep(2)
# The main sets up the queue as the communication channel and synchronizes them
async def main():
queue = asyncio.Queue()
loop = asyncio.get_running_loop()
blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
nonblocking_task = loop.create_task(nonblocking(queue))
running = True # use whatever exit condition
while running:
# Get messages from both blocking and non-blocking in parallel
message = await queue.get()
# You could send any messages, and do anything you want with them
print(message)
asyncio.run(main())
如何