基于另一个生成器的生成器
Generator that is based on another generator
我的任务其实很简单,但我不知道如何实现它。我打算在我的 ML 算法中使用它,但让我们简化示例。假设有一个像下面这样的生成器:
nums = ((i+1) for i in range(4))
以上,将产生 1
、2
、3
和 4
。
假设上述生成器return的个体"samples"。我想编写一个生成器方法来对它们进行批处理。假设,批量大小为 2
。所以如果这个新方法被调用:
def batch_generator(batch_size):
do something on nums
yield batches of size batch_size
然后这个批处理生成器的输出将是:1
和 2
,然后是 3
和 4
。 Tuples/lists无所谓。重要的是如何 return 这些批次。我发现 yield from
关键字是在 Python 3.3 中引入的,但它似乎对我的情况没有用。
显然,如果我们有 5
nums 而不是 4
,并且 batch_size
是 2
,我们将忽略第一个生成器的最后产生的值。
我自己的解决方案可能是,
nums = (i+1 for i in range(4))
def giveBatch(gen, numOfItems):
try:
return [next(gen) for i in range(numOfItems)]
except StopIteration:
pass
giveBatch(nums, 2)
# [1, 2]
giveBatch(nums, 2)
# [3, 4]
另一种解决方案是使用 grouper
作为@Bharel 提到的。我比较了 运行 这两种解决方案所花费的时间。没有太大区别。估计可以忽略。
from timeit import timeit
def wrapper(func, *args, **kwargs):
def wrapped():
return func(*args, **kwargs)
return wrapped
nums = (i+1 for i in range(1000000))
wrappedGiveBatch = wrapper(giveBatch, nums, 2)
timeit(wrappedGiveBatch, number=1000000)
# ~ 0.998439
wrappedGrouper = wrapper(grouper, nums, 2)
timeit(wrappedGrouper, number=1000000)
# ~ 0.734342
在 itertools 下,您有一个代码片段可以做到这一点:
from itertools import zip_longest
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
您不必每次都调用一个方法,而是拥有一个 returns 批处理的迭代器,效率更高、速度更快,并且可以处理像 运行 这样过早地用完数据而不会丢失数据的极端情况。
这正是我需要的:
def giveBatch(numOfItems):
nums = (i+1 for i in range(7))
while True:
yield [next(nums) for i in range(numOfItems)]
我的任务其实很简单,但我不知道如何实现它。我打算在我的 ML 算法中使用它,但让我们简化示例。假设有一个像下面这样的生成器:
nums = ((i+1) for i in range(4))
以上,将产生 1
、2
、3
和 4
。
假设上述生成器return的个体"samples"。我想编写一个生成器方法来对它们进行批处理。假设,批量大小为 2
。所以如果这个新方法被调用:
def batch_generator(batch_size):
do something on nums
yield batches of size batch_size
然后这个批处理生成器的输出将是:1
和 2
,然后是 3
和 4
。 Tuples/lists无所谓。重要的是如何 return 这些批次。我发现 yield from
关键字是在 Python 3.3 中引入的,但它似乎对我的情况没有用。
显然,如果我们有 5
nums 而不是 4
,并且 batch_size
是 2
,我们将忽略第一个生成器的最后产生的值。
我自己的解决方案可能是,
nums = (i+1 for i in range(4))
def giveBatch(gen, numOfItems):
try:
return [next(gen) for i in range(numOfItems)]
except StopIteration:
pass
giveBatch(nums, 2)
# [1, 2]
giveBatch(nums, 2)
# [3, 4]
另一种解决方案是使用 grouper
作为@Bharel 提到的。我比较了 运行 这两种解决方案所花费的时间。没有太大区别。估计可以忽略。
from timeit import timeit
def wrapper(func, *args, **kwargs):
def wrapped():
return func(*args, **kwargs)
return wrapped
nums = (i+1 for i in range(1000000))
wrappedGiveBatch = wrapper(giveBatch, nums, 2)
timeit(wrappedGiveBatch, number=1000000)
# ~ 0.998439
wrappedGrouper = wrapper(grouper, nums, 2)
timeit(wrappedGrouper, number=1000000)
# ~ 0.734342
在 itertools 下,您有一个代码片段可以做到这一点:
from itertools import zip_longest
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
# grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue=fillvalue)
您不必每次都调用一个方法,而是拥有一个 returns 批处理的迭代器,效率更高、速度更快,并且可以处理像 运行 这样过早地用完数据而不会丢失数据的极端情况。
这正是我需要的:
def giveBatch(numOfItems):
nums = (i+1 for i in range(7))
while True:
yield [next(nums) for i in range(numOfItems)]