如何 运行 多个并行阻塞 IO 协程
How to run multiple blocking IO coroutines in parallel
我有一个很长的 运行ning 管道,它是用协程实现的。这通常是获取日志流,执行一些丰富(线程化)并将它们写入数据存储。
下面是一个模拟管道的小例子:
import time
import random
from concurrent import futures
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
@coroutine
def foo():
pool = futures.ThreadPoolExecutor(max_workers=10)
while True:
i = (yield)
fut = pool.submit(enrich, i)
fut.add_done_callback(result_handler)
time.sleep(random.random()*10)
def enrich(i):
enriched = 'foo' + str(i)
time.sleep(random.random())
return enriched
def source(name, target):
while True:
time.sleep(random.random())
i = random.randint(0,10)
target.send(name + str(i))
如下调用的单个管道工作正常。
source('task one ', foo())
现在,我想 运行 后台线程中不同日志的多个管道。一种尝试是再次使用ThreadPoolExecutor来驱动多个流水线。
def run():
pool = futures.ThreadPoolExecutor(max_workers=10)
tasks = [source('task one ', foo()),
source('task two ', foo())]
for task in tasks:
fut = pool.submit(task)
fut.add_done_callback(result_handler)
但是,管道在第一个任务之后阻塞并且永远不会执行第二个任务。在后台线程中 运行 如此长的 运行 宁(也许永远)管道的正确方法是什么?
因为source
函数是永无止境的,所以不会创建tasks = [source('task one ', foo()), source('task two ', foo())]
列表。这就是第一个任务运行而管道阻塞的原因。
解决方案是将 source
及其参数传递给 pool.submit
。
tasks = [(source, 'task one', foo()), (source, 'task two', foo())]
for task in tasks:
fut = pool.submit(*task)
fut.add_done_callback(result_handler)
我有一个很长的 运行ning 管道,它是用协程实现的。这通常是获取日志流,执行一些丰富(线程化)并将它们写入数据存储。
下面是一个模拟管道的小例子:
import time
import random
from concurrent import futures
def coroutine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
next(cr)
return cr
return start
@coroutine
def foo():
pool = futures.ThreadPoolExecutor(max_workers=10)
while True:
i = (yield)
fut = pool.submit(enrich, i)
fut.add_done_callback(result_handler)
time.sleep(random.random()*10)
def enrich(i):
enriched = 'foo' + str(i)
time.sleep(random.random())
return enriched
def source(name, target):
while True:
time.sleep(random.random())
i = random.randint(0,10)
target.send(name + str(i))
如下调用的单个管道工作正常。
source('task one ', foo())
现在,我想 运行 后台线程中不同日志的多个管道。一种尝试是再次使用ThreadPoolExecutor来驱动多个流水线。
def run():
pool = futures.ThreadPoolExecutor(max_workers=10)
tasks = [source('task one ', foo()),
source('task two ', foo())]
for task in tasks:
fut = pool.submit(task)
fut.add_done_callback(result_handler)
但是,管道在第一个任务之后阻塞并且永远不会执行第二个任务。在后台线程中 运行 如此长的 运行 宁(也许永远)管道的正确方法是什么?
因为source
函数是永无止境的,所以不会创建tasks = [source('task one ', foo()), source('task two ', foo())]
列表。这就是第一个任务运行而管道阻塞的原因。
解决方案是将 source
及其参数传递给 pool.submit
。
tasks = [(source, 'task one', foo()), (source, 'task two', foo())]
for task in tasks:
fut = pool.submit(*task)
fut.add_done_callback(result_handler)