在 python 中替代 itertools.tee
Alternative to itertools.tee in python
我正在处理拆分成多个文件的大量输入数据。试图将处理算法与 I/O 分开,我使用生成器设置所有内容。这工作得很好,除非我想对通过生成器的数据进行一些中间操作。这是一个击中要点的例子
import numpy as np
from itertools import izip, tee
# Have two input matrices. In reality they're very large, so data is provided
# one row at a time via generators.
M, N = 100, 3
def gen_data_rows(m,n):
for i in range(m):
yield np.random.normal(size=n)
rows1 = gen_data_rows(M,N)
rows2 = gen_data_rows(M,N)
# Signal processing operates on chunks of the input, e.g. blocks of rows and
# yields results at a reduced rate. Here's a simple example.
def foo_rows(rows):
i = 0
for row in rows:
if i % 5 == 0:
yield row
i += 1
# But what if we want to do some transformations between the raw input data
# and the processing?
def fun1(x, y):
return x + y
def fun2(x, y):
return (x + y)**2
def foo_transformed_rows(rows1, rows2):
# Define a generator that consumes both inputs at the same time and
# produces two streams of output I'd like to send to foo_rows().
def gen_transformed_rows(rows1, rows2):
for x, y in izip(rows1, rows2):
yield fun1(x,y), fun2(x,y)
# Do I really need to tee the above and define separate generators to pick
# off each result?
def pick_generator_idx(gen, i):
for vals in gen:
yield vals[i]
gen_xformed_rows, dupe = tee(gen_transformed_rows(rows1, rows2))
gen_foo_fun1 = foo_rows(pick_generator_idx(gen_xformed_rows, 0))
gen_foo_fun2 = foo_rows(pick_generator_idx(dupe, 1))
for foo1, foo2 in izip(gen_foo_fun1, gen_foo_fun2):
yield foo1, foo2
for foo1, foo2 in foo_transformed_rows(rows1, rows2):
print foo1, foo2
我认为这里的主要问题是我有两个输入,我想将它们组合成两个中间生成器(I/O 是瓶颈,所以我真的不想 运行 通过数据两次)。 foo_transformed_rows()
函数有没有更好的实现方式?必须 tee()
所需的数据并定义生成器只是为了从元组中挑选项目似乎有点过分了。
编辑:我根据评论稍微修改了示例,但不幸的是,为了保持完整,它仍然很长。基本问题是处理多输入多输出 (MIMO) 数据流。我想我想要类似 yield
的语句来生成多个生成器,例如
def two_streams(gen_a, gen_b):
"Consumes two generators, produces two results."
for a, b in itertools.izip(gen_a, gen_b):
c, d = foo(a, b)
yield c, d
# This doesn't work. You get one generator of tuples instead of
# two generators of singletons.
gen_c, gen_d = two_streams(gen_a, gen_b)
我想也许会有一些 itertools 魔法来做同样的事情。
我同意@ShadowRanger 的评论,我不明白你为什么要避免tee
。它很适合这个目的。
但是,对我来说,使用原始生成器似乎更简单、更直观:
def transform_rows(fun, rows1, rows2):
for x, y in izip(rows1, rows2):
yield fun(x,y)
rows1a, rows1b = tee(rows1)
rows2a, rows2b = tee(rows2)
gen_foo_fun1 = foo_rows(transform_rows(fun1, rows1a, rows2a)
gen_foo_fun2 = foo_rows(transform_rows(fun2, rows1b, rows2b)
我正在处理拆分成多个文件的大量输入数据。试图将处理算法与 I/O 分开,我使用生成器设置所有内容。这工作得很好,除非我想对通过生成器的数据进行一些中间操作。这是一个击中要点的例子
import numpy as np
from itertools import izip, tee
# Have two input matrices. In reality they're very large, so data is provided
# one row at a time via generators.
M, N = 100, 3
def gen_data_rows(m,n):
for i in range(m):
yield np.random.normal(size=n)
rows1 = gen_data_rows(M,N)
rows2 = gen_data_rows(M,N)
# Signal processing operates on chunks of the input, e.g. blocks of rows and
# yields results at a reduced rate. Here's a simple example.
def foo_rows(rows):
i = 0
for row in rows:
if i % 5 == 0:
yield row
i += 1
# But what if we want to do some transformations between the raw input data
# and the processing?
def fun1(x, y):
return x + y
def fun2(x, y):
return (x + y)**2
def foo_transformed_rows(rows1, rows2):
# Define a generator that consumes both inputs at the same time and
# produces two streams of output I'd like to send to foo_rows().
def gen_transformed_rows(rows1, rows2):
for x, y in izip(rows1, rows2):
yield fun1(x,y), fun2(x,y)
# Do I really need to tee the above and define separate generators to pick
# off each result?
def pick_generator_idx(gen, i):
for vals in gen:
yield vals[i]
gen_xformed_rows, dupe = tee(gen_transformed_rows(rows1, rows2))
gen_foo_fun1 = foo_rows(pick_generator_idx(gen_xformed_rows, 0))
gen_foo_fun2 = foo_rows(pick_generator_idx(dupe, 1))
for foo1, foo2 in izip(gen_foo_fun1, gen_foo_fun2):
yield foo1, foo2
for foo1, foo2 in foo_transformed_rows(rows1, rows2):
print foo1, foo2
我认为这里的主要问题是我有两个输入,我想将它们组合成两个中间生成器(I/O 是瓶颈,所以我真的不想 运行 通过数据两次)。 foo_transformed_rows()
函数有没有更好的实现方式?必须 tee()
所需的数据并定义生成器只是为了从元组中挑选项目似乎有点过分了。
编辑:我根据评论稍微修改了示例,但不幸的是,为了保持完整,它仍然很长。基本问题是处理多输入多输出 (MIMO) 数据流。我想我想要类似 yield
的语句来生成多个生成器,例如
def two_streams(gen_a, gen_b):
"Consumes two generators, produces two results."
for a, b in itertools.izip(gen_a, gen_b):
c, d = foo(a, b)
yield c, d
# This doesn't work. You get one generator of tuples instead of
# two generators of singletons.
gen_c, gen_d = two_streams(gen_a, gen_b)
我想也许会有一些 itertools 魔法来做同样的事情。
我同意@ShadowRanger 的评论,我不明白你为什么要避免tee
。它很适合这个目的。
但是,对我来说,使用原始生成器似乎更简单、更直观:
def transform_rows(fun, rows1, rows2):
for x, y in izip(rows1, rows2):
yield fun(x,y)
rows1a, rows1b = tee(rows1)
rows2a, rows2b = tee(rows2)
gen_foo_fun1 = foo_rows(transform_rows(fun1, rows1a, rows2a)
gen_foo_fun2 = foo_rows(transform_rows(fun2, rows1b, rows2b)