如何在 python 中设计异步管道模式
How to design an async pipeline pattern in python
我正在尝试设计一个可以轻松制作数据处理管道的异步管道。管道由几个函数组成。输入数据从管道的一端进入,从另一端出来。
我想按照以下方式设计管道:
- 可以在管道中插入其他功能
- 管道中已有的功能可以弹出。
这是我想出的:
import asyncio
@asyncio.coroutine
def add(x):
return x + 1
@asyncio.coroutine
def prod(x):
return x * 2
@asyncio.coroutine
def power(x):
return x ** 3
def connect(funcs):
def wrapper(*args, **kwargs):
data_out = yield from funcs[0](*args, **kwargs)
for func in funcs[1:]:
data_out = yield from func(data_out)
return data_out
return wrapper
pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)
这当然有效,但问题是,如果我想向此管道添加另一个函数(或从中弹出一个函数),我必须再次拆卸并重新连接每个函数。
我想知道是否有更好的方案或设计模式来创建这样的管道?
我不知道这是否是最好的方法,但这是我的解决方案。
虽然我认为可以使用列表或字典来控制管道,但我发现使用生成器更容易、更有效。
考虑以下生成器:
def controller():
old = value = None
while True:
new = (yield value)
value = old
old = new
这基本上是一个 one-element 队列,它存储您发送给它的值并在下次调用 send
(或 next
)时释放它。
示例:
>>> c = controller()
>>> next(c) # prime the generator
>>> c.send(8) # send a value
>>> next(c) # pull the value from the generator
8
通过将管道中的每个协程与其控制器相关联,我们将拥有一个外部句柄,我们可以使用它来推送每个协程的目标。我们只需要以一种方式定义我们的协同程序,它们将在每个周期从我们的控制器中拉出新目标。
现在考虑以下协程:
def source(controller):
while True:
target = next(controller)
print("source sending to", target.__name__)
yield (yield from target)
def add():
return (yield) + 1
def prod():
return (yield) * 2
源是一个不 return
的协程,因此它不会在第一个循环后自行终止。其他协程是 "sinks" 并且不需要控制器。
您可以在管道中使用这些协程,如以下示例所示。我们最初设置了一条路线 source --> add
,在收到第一个结果后,我们将路线更改为 source --> prod
.
# create a controller for the source and prime it
cont_source = controller()
next(cont_source)
# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()
# create a pipeline
cont_source.send(coro_add)
# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))
# change target of the source
cont_source.send(coro_prod)
# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))
输出:
source sending to add
add = 5
source sending to prod
prod = 16
我以前做过类似的事情,只使用 multiprocessing 库。它有点手动,但它使您能够按照您在问题中的要求轻松创建和修改管道。
这个想法是创建可以存在于多处理池中的函数,它们唯一的参数是一个输入队列和一个输出队列。您通过传递不同的队列将阶段联系在一起。每个阶段在其输入队列上接收一些工作,再做一些工作,然后通过其输出队列将结果传递到下一个阶段。
工人们不停地旋转,试图从他们的队列中得到一些东西,当他们得到一些东西时,他们就开始工作并将结果传递给下一个阶段。所有工作都通过管道传递 "poison pill" 结束,导致所有阶段退出:
这个例子只是在多个工作阶段构建一个字符串:
import multiprocessing as mp
POISON_PILL = "STOP"
def stage1(q_in, q_out):
while True:
# get either work or a poison pill from the previous stage (or main)
val = q_in.get()
# check to see if we got the poison pill - pass it along if we did
if val == POISON_PILL:
q_out.put(val)
return
# do stage 1 work
val = val + "Stage 1 did some work.\n"
# pass the result to the next stage
q_out.put(val)
def stage2(q_in, q_out):
while True:
val = q_in.get()
if val == POISON_PILL:
q_out.put(val)
return
val = val + "Stage 2 did some work.\n"
q_out.put(val)
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
q_main_to_s1.put(POISON_PILL)
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
代码生成此输出:
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
您可以轻松地在管道中放置更多阶段,或者只需更改哪些函数获取哪些队列即可重新排列它们。我对 asyncio 模块不是很熟悉,所以我不能说使用多处理库会失去哪些功能,但这种方法实现和理解起来非常简单,所以我喜欢它简单。
我正在尝试设计一个可以轻松制作数据处理管道的异步管道。管道由几个函数组成。输入数据从管道的一端进入,从另一端出来。
我想按照以下方式设计管道:
- 可以在管道中插入其他功能
- 管道中已有的功能可以弹出。
这是我想出的:
import asyncio
@asyncio.coroutine
def add(x):
return x + 1
@asyncio.coroutine
def prod(x):
return x * 2
@asyncio.coroutine
def power(x):
return x ** 3
def connect(funcs):
def wrapper(*args, **kwargs):
data_out = yield from funcs[0](*args, **kwargs)
for func in funcs[1:]:
data_out = yield from func(data_out)
return data_out
return wrapper
pipeline = connect([add, prod, power])
input = 1
output = asyncio.get_event_loop().run_until_complete(pipeline(input))
print(output)
这当然有效,但问题是,如果我想向此管道添加另一个函数(或从中弹出一个函数),我必须再次拆卸并重新连接每个函数。
我想知道是否有更好的方案或设计模式来创建这样的管道?
我不知道这是否是最好的方法,但这是我的解决方案。
虽然我认为可以使用列表或字典来控制管道,但我发现使用生成器更容易、更有效。
考虑以下生成器:
def controller():
old = value = None
while True:
new = (yield value)
value = old
old = new
这基本上是一个 one-element 队列,它存储您发送给它的值并在下次调用 send
(或 next
)时释放它。
示例:
>>> c = controller()
>>> next(c) # prime the generator
>>> c.send(8) # send a value
>>> next(c) # pull the value from the generator
8
通过将管道中的每个协程与其控制器相关联,我们将拥有一个外部句柄,我们可以使用它来推送每个协程的目标。我们只需要以一种方式定义我们的协同程序,它们将在每个周期从我们的控制器中拉出新目标。
现在考虑以下协程:
def source(controller):
while True:
target = next(controller)
print("source sending to", target.__name__)
yield (yield from target)
def add():
return (yield) + 1
def prod():
return (yield) * 2
源是一个不 return
的协程,因此它不会在第一个循环后自行终止。其他协程是 "sinks" 并且不需要控制器。
您可以在管道中使用这些协程,如以下示例所示。我们最初设置了一条路线 source --> add
,在收到第一个结果后,我们将路线更改为 source --> prod
.
# create a controller for the source and prime it
cont_source = controller()
next(cont_source)
# create three coroutines
# associate the source with its controller
coro_source = source(cont_source)
coro_add = add()
coro_prod = prod()
# create a pipeline
cont_source.send(coro_add)
# prime the source and send a value to it
coro_source.send(None)
print("add =", coro_source.send(4))
# change target of the source
cont_source.send(coro_prod)
# reset the source, send another value
coro_source.send(None)
print("prod =", coro_source.send(8))
输出:
source sending to add
add = 5
source sending to prod
prod = 16
我以前做过类似的事情,只使用 multiprocessing 库。它有点手动,但它使您能够按照您在问题中的要求轻松创建和修改管道。
这个想法是创建可以存在于多处理池中的函数,它们唯一的参数是一个输入队列和一个输出队列。您通过传递不同的队列将阶段联系在一起。每个阶段在其输入队列上接收一些工作,再做一些工作,然后通过其输出队列将结果传递到下一个阶段。
工人们不停地旋转,试图从他们的队列中得到一些东西,当他们得到一些东西时,他们就开始工作并将结果传递给下一个阶段。所有工作都通过管道传递 "poison pill" 结束,导致所有阶段退出:
这个例子只是在多个工作阶段构建一个字符串:
import multiprocessing as mp
POISON_PILL = "STOP"
def stage1(q_in, q_out):
while True:
# get either work or a poison pill from the previous stage (or main)
val = q_in.get()
# check to see if we got the poison pill - pass it along if we did
if val == POISON_PILL:
q_out.put(val)
return
# do stage 1 work
val = val + "Stage 1 did some work.\n"
# pass the result to the next stage
q_out.put(val)
def stage2(q_in, q_out):
while True:
val = q_in.get()
if val == POISON_PILL:
q_out.put(val)
return
val = val + "Stage 2 did some work.\n"
q_out.put(val)
def main():
pool = mp.Pool()
manager = mp.Manager()
# create managed queues
q_main_to_s1 = manager.Queue()
q_s1_to_s2 = manager.Queue()
q_s2_to_main = manager.Queue()
# launch workers, passing them the queues they need
results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2))
results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main))
# Send a message into the pipeline
q_main_to_s1.put("Main started the job.\n")
# Wait for work to complete
print(q_s2_to_main.get()+"Main finished the job.")
q_main_to_s1.put(POISON_PILL)
pool.close()
pool.join()
return
if __name__ == "__main__":
main()
代码生成此输出:
Main started the job.
Stage 1 did some work.
Stage 2 did some work.
Main finished the job.
您可以轻松地在管道中放置更多阶段,或者只需更改哪些函数获取哪些队列即可重新排列它们。我对 asyncio 模块不是很熟悉,所以我不能说使用多处理库会失去哪些功能,但这种方法实现和理解起来非常简单,所以我喜欢它简单。