如何将元组的 Python 生成器拆分为 2 个独立的生成器?
How to split a Python generator of tuples into 2 separate generators?
我有一个生成器,大致如下:
def gen1():
for x, y in enumerate(xrange(20)):
a = 5*x
b = 10*y
yield a, b
从这个生成器,我想创建 2 个独立的生成器,如下所示:
for a in gen1_split_a():
yield a
for b in gen1_split_b():
yield b
我的玩法是什么,SA?
你不能,不是没有结束所有生成器输出只是为了能够在第二个循环中产生 b
值。这在内存方面可能会变得昂贵。
您将 itertools.tee()
用于 'duplicate' 生成器:
from itertools import tee
def split_gen(gen):
gen_a, gen_b = tee(gen, 2)
return (a for a, b in gen_a), (b for a, b in gen_b)
gen1_split_a, gen1_split_b = split_gen(gen1)
for a in gen1_split_a:
print a
for b in gen1_split_b:
print b
但在这种情况下发生的是 tee
对象最终将不得不存储 所有 gen1
产生的东西 。来自文档:
This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). In general, if one iterator uses most or all of the data before another iterator starts, it is faster to use list()
instead of tee()
.
按照该建议,只需将 b
值放入第二个循环的列表中:
b_values = []
for a, b in gen1():
print a
b_values.append(a)
for b in b_values:
print b
或者更好的是,只需在一个循环中同时处理 a
和 b
。
我有一个解决方案可能不是您想要的。它将 n
元组生成器分成 n
个单独生成器的元组。 但是,它要求当前元组的每个单独值都已返回才能继续下一个元组。严格来说,它 "splits" 一个 n
元组生成器到 n
个生成器,但是你的例子不会像展示的那样工作。
它利用 Python 将值发送回生成器以影响未来收益的能力。同样的想法也应该可以用 类 来实现,但我还是想掌握生成器。
当新的生成器被初始化时,它们只知道当前的n
元组。每次它们在各自的索引处产生值时,都会执行一个回调,通知更高级别的生成器该索引。一旦生成了当前元组的所有索引,更高级别的生成器就会移动到下一个元组并重复该过程。
可能有点笨拙,但这是代码 (Python 3.6)。
from typing import TypeVar, Generator, Tuple, Iterator, Optional
TYPE_A = TypeVar("TYPE_A")
def _next_value(source: Iterator[Tuple[TYPE_A, ...]], size: int) -> Generator[Tuple[TYPE_A, ...], Optional[int], None]:
checked = [False for _ in range(size)]
value = next(source)
while True:
index = yield value
if all(checked):
value = next(source)
for _i in range(len(checked)):
checked[_i] = False
checked[index] = True
def _sub_iterator(index: int, callback: Generator[Tuple[TYPE_A, ...], int, None]) -> Generator[TYPE_A, None, None]:
while True:
value = callback.send(index)
yield value[index]
def split_iterator(source: Iterator[Tuple[TYPE_A, ...]], size: int) -> Tuple[Generator[TYPE_A, Optional[TYPE_A], None], ...]:
generators = []
_cb = _next_value(source, size)
_cb.send(None)
for _i in range(size):
each_generator = _sub_iterator(_i, _cb)
generators.append(each_generator)
return tuple(generators)
if __name__ == "__main__":
def triple():
_i = 0
while True:
yield tuple(range(_i, _i + 3))
_i += 1
g = triple()
for i, each_value in enumerate(g):
if i >= 5:
break
print(each_value)
print()
g = triple()
a_gen, b_gen, c_gen = split_iterator(g, 3)
for i, (a_value, b_value, c_value) in enumerate(zip(a_gen, b_gen, c_gen)):
if i >= 5:
break
print((a_value, b_value, c_value))
triple()
是一个 3 元组生成器,split_iterator()
生成三个生成器,每个生成器从 triple()
生成的元组中生成一个索引。每个人 _sub_iterator
只有在当前元组的所有值都已产生后才会进步。
我有一个生成器,大致如下:
def gen1():
for x, y in enumerate(xrange(20)):
a = 5*x
b = 10*y
yield a, b
从这个生成器,我想创建 2 个独立的生成器,如下所示:
for a in gen1_split_a():
yield a
for b in gen1_split_b():
yield b
我的玩法是什么,SA?
你不能,不是没有结束所有生成器输出只是为了能够在第二个循环中产生 b
值。这在内存方面可能会变得昂贵。
您将 itertools.tee()
用于 'duplicate' 生成器:
from itertools import tee
def split_gen(gen):
gen_a, gen_b = tee(gen, 2)
return (a for a, b in gen_a), (b for a, b in gen_b)
gen1_split_a, gen1_split_b = split_gen(gen1)
for a in gen1_split_a:
print a
for b in gen1_split_b:
print b
但在这种情况下发生的是 tee
对象最终将不得不存储 所有 gen1
产生的东西 。来自文档:
This itertool may require significant auxiliary storage (depending on how much temporary data needs to be stored). In general, if one iterator uses most or all of the data before another iterator starts, it is faster to use
list()
instead oftee()
.
按照该建议,只需将 b
值放入第二个循环的列表中:
b_values = []
for a, b in gen1():
print a
b_values.append(a)
for b in b_values:
print b
或者更好的是,只需在一个循环中同时处理 a
和 b
。
我有一个解决方案可能不是您想要的。它将 n
元组生成器分成 n
个单独生成器的元组。 但是,它要求当前元组的每个单独值都已返回才能继续下一个元组。严格来说,它 "splits" 一个 n
元组生成器到 n
个生成器,但是你的例子不会像展示的那样工作。
它利用 Python 将值发送回生成器以影响未来收益的能力。同样的想法也应该可以用 类 来实现,但我还是想掌握生成器。
当新的生成器被初始化时,它们只知道当前的n
元组。每次它们在各自的索引处产生值时,都会执行一个回调,通知更高级别的生成器该索引。一旦生成了当前元组的所有索引,更高级别的生成器就会移动到下一个元组并重复该过程。
可能有点笨拙,但这是代码 (Python 3.6)。
from typing import TypeVar, Generator, Tuple, Iterator, Optional
TYPE_A = TypeVar("TYPE_A")
def _next_value(source: Iterator[Tuple[TYPE_A, ...]], size: int) -> Generator[Tuple[TYPE_A, ...], Optional[int], None]:
checked = [False for _ in range(size)]
value = next(source)
while True:
index = yield value
if all(checked):
value = next(source)
for _i in range(len(checked)):
checked[_i] = False
checked[index] = True
def _sub_iterator(index: int, callback: Generator[Tuple[TYPE_A, ...], int, None]) -> Generator[TYPE_A, None, None]:
while True:
value = callback.send(index)
yield value[index]
def split_iterator(source: Iterator[Tuple[TYPE_A, ...]], size: int) -> Tuple[Generator[TYPE_A, Optional[TYPE_A], None], ...]:
generators = []
_cb = _next_value(source, size)
_cb.send(None)
for _i in range(size):
each_generator = _sub_iterator(_i, _cb)
generators.append(each_generator)
return tuple(generators)
if __name__ == "__main__":
def triple():
_i = 0
while True:
yield tuple(range(_i, _i + 3))
_i += 1
g = triple()
for i, each_value in enumerate(g):
if i >= 5:
break
print(each_value)
print()
g = triple()
a_gen, b_gen, c_gen = split_iterator(g, 3)
for i, (a_value, b_value, c_value) in enumerate(zip(a_gen, b_gen, c_gen)):
if i >= 5:
break
print((a_value, b_value, c_value))
triple()
是一个 3 元组生成器,split_iterator()
生成三个生成器,每个生成器从 triple()
生成的元组中生成一个索引。每个人 _sub_iterator
只有在当前元组的所有值都已产生后才会进步。