链接嵌套收益率
Chaining Nested Yields
我想构建一个数据管道,对数据系列的行执行一系列操作。
大多数函数将在一行输入、一行输出的基础上工作,但其中一些操作将 "expand" 系列 - 我的意思是一行将进入函数并作为该功能的结果,可能会生成不止一行。
我想设置一个功能链,这些功能足够强大,可以自行处理这种行为,而无需编写一堆监督代码。
使用 yield
是一个机会——如果每个函数都消耗前一个函数的收益,并且自己充当生成器,那么我可以任意地将这些格式良好的函数链接在一起——从优雅的角度来看,这会很好。
这是我的设置代码,func_x
充当简单的 1-1 函数,func_y
进行扩展。
from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
for content in source:
yield content.copy()
def func_x(row):
try:
q=next(row)
if q['name']=="Tom":
q['name']="Richard"
yield q.copy()
except StopIteration:
print ("Stop x")
def func_y(row):
try:
q=next(row)
for thingy in range(0,2):
q['thingy']=thingy
yield q.copy()
except StopIteration:
print ("Stop y")
rg = row_getter(data_source)
iter_func = func_y(func_x(rg))
现在,我可以通过遍历 iter_func 对象来获取第一组数据:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])
再一次:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])
再一次,虽然这一次,我没有看到史蒂夫的记录(即流程中的下一条记录,现在第一条记录上 func_y 的扩展已经完成)我得到了 StopIteration
错误。
print (next(iter_func))
>> StopIteration Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))
StopIteration:
所以我不明白这是从哪里来的,因为我试图在 func_x
和 func_y
中捕获它们。
你的 func_x
函数只产生一个项目,所以它在那个被消耗后完成。尝试这样的事情:
def func_x(row):
try:
for q in row:
if q['name']=="Tom":
q['name']="Richard"
yield q
except StopIteration:
print ("Stop x")
顺便说一下,请注意每个 yielding 确实 而不是 复制对象。在许多情况下这可能没问题,但请注意,在 func_y
中,您两次生成每个相同的对象,将 'thingy'
设置为不同的值。这意味着,例如,如果您这样做(在您发布的代码之后):
d1 = next(iter_func)
d2 = next(iter_func)
d1
和 d2
将是同一个对象,特别是它们都将 'thingy'
设置为 1
。
内置工具(特别是 map
和 itertools.chain
)可以为您完成这项工作。
from collections import OrderedDict
from itertools import chain
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def rename(d):
if d['name'] == "Tom":
d['name'] = "Richard"
return d
def add_thingy(d):
for y in range(2):
yield {'thingy': y, **d}
for x in chain.from_iterable(add_thingy(d)
for d in map(rename,
data_source)):
print(x)
map
不是必须的;我们可以将 rename
应用于每个 dict
,然后再将其传递给生成器表达式中的 add_thingy
。
for x in chain.from_iterable(add_thingy(rename(d)) for d in data_source):
print(x)
或者换个方向使用 map
两次:
for x in chain.from_iterable(map(add_thingy, map(rename, data_source))):
print(x)
我想构建一个数据管道,对数据系列的行执行一系列操作。
大多数函数将在一行输入、一行输出的基础上工作,但其中一些操作将 "expand" 系列 - 我的意思是一行将进入函数并作为该功能的结果,可能会生成不止一行。
我想设置一个功能链,这些功能足够强大,可以自行处理这种行为,而无需编写一堆监督代码。
使用 yield
是一个机会——如果每个函数都消耗前一个函数的收益,并且自己充当生成器,那么我可以任意地将这些格式良好的函数链接在一起——从优雅的角度来看,这会很好。
这是我的设置代码,func_x
充当简单的 1-1 函数,func_y
进行扩展。
from collections import OrderedDict
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def row_getter(source):
for content in source:
yield content.copy()
def func_x(row):
try:
q=next(row)
if q['name']=="Tom":
q['name']="Richard"
yield q.copy()
except StopIteration:
print ("Stop x")
def func_y(row):
try:
q=next(row)
for thingy in range(0,2):
q['thingy']=thingy
yield q.copy()
except StopIteration:
print ("Stop y")
rg = row_getter(data_source)
iter_func = func_y(func_x(rg))
现在,我可以通过遍历 iter_func 对象来获取第一组数据:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 0)])
再一次:
print (next(iter_func))
>> OrderedDict([('id', '1'), ('name', 'Richard'), ('sync', 'a'), ('thingy', 1)])
再一次,虽然这一次,我没有看到史蒂夫的记录(即流程中的下一条记录,现在第一条记录上 func_y 的扩展已经完成)我得到了 StopIteration
错误。
print (next(iter_func))
>> StopIteration Traceback (most recent call last)
<ipython-input-15-0fd1ed48c61b> in <module>()
----> 1 print (next(iter_func))
StopIteration:
所以我不明白这是从哪里来的,因为我试图在 func_x
和 func_y
中捕获它们。
你的 func_x
函数只产生一个项目,所以它在那个被消耗后完成。尝试这样的事情:
def func_x(row):
try:
for q in row:
if q['name']=="Tom":
q['name']="Richard"
yield q
except StopIteration:
print ("Stop x")
顺便说一下,请注意每个 yielding 确实 而不是 复制对象。在许多情况下这可能没问题,但请注意,在 func_y
中,您两次生成每个相同的对象,将 'thingy'
设置为不同的值。这意味着,例如,如果您这样做(在您发布的代码之后):
d1 = next(iter_func)
d2 = next(iter_func)
d1
和 d2
将是同一个对象,特别是它们都将 'thingy'
设置为 1
。
内置工具(特别是 map
和 itertools.chain
)可以为您完成这项工作。
from collections import OrderedDict
from itertools import chain
data_source = [ OrderedDict({"id" : "1", "name" : "Tom", "sync" : "a"}),
OrderedDict({"id" : "2", "name" : "Steve", "sync" : "a"}),
OrderedDict({"id" : "3", "name" : "Ulrich", "sync" : "b"}),
OrderedDict({"id" : "4", "name" : "Victor", "sync" : "b"}),
OrderedDict({"id" : "5", "name" : "Wolfgang", "sync" : "c"}),
OrderedDict({"id" : "6", "name" : "Xavier", "sync" : "c"}),
OrderedDict({"id" : "7", "name" : "Yves", "sync" : "c"}),
OrderedDict({"id" : "8", "name" : "Zaphod", "sync" : "d"})]
def rename(d):
if d['name'] == "Tom":
d['name'] = "Richard"
return d
def add_thingy(d):
for y in range(2):
yield {'thingy': y, **d}
for x in chain.from_iterable(add_thingy(d)
for d in map(rename,
data_source)):
print(x)
map
不是必须的;我们可以将 rename
应用于每个 dict
,然后再将其传递给生成器表达式中的 add_thingy
。
for x in chain.from_iterable(add_thingy(rename(d)) for d in data_source):
print(x)
或者换个方向使用 map
两次:
for x in chain.from_iterable(map(add_thingy, map(rename, data_source))):
print(x)