如何在 Python 中为 Apache Beam 提供有用的辅助输入? AsDict 对象不可订阅?
How do I make a useful side input I can access for Apache Beam in Python? AsDict object not subscriptable?
好消息,我使用的是批处理,而不是流式处理。我还是不明白这是怎么回事。显然 pvalue.AsDict() 实际上并没有给你一个字典值?只是 PCollection 的包装器。我怎么才能把它变成一个字典,这样我就可以使用它了?
此代码在 init 方法上失败,就在我尝试像访问字典一样访问操作时。
错误是
File "[path]", line 40, in process
location_indexes = [x[0] for x in self.operations["lookup_location_id"]]
TypeError: 'AsDict' object is not subscriptable [while running 'ParDo(Locations)']
罪魁祸首
class Locations(beam.DoFn): # Location_ID
def __init__(self, operations: dict):
self.locations: list = operations["lookup_location_id"]
def process(self, element: str):
# I have code here, not relevant
这是我称之为位置的地方...
locations = (
csv_data
| beam.ParDo(Locations(operations=beam.pvalue.AsDict(operations)))
| "Dedup locations" >> beam.Distinct()
)
operations 是元组的集合。这是管道:
operations = (
transforms
| ParDo(Semantics(headers))
| GroupByKey()
headers 实际上是一个普通的 ol' 列表。所以它作为 SideInput 效果很好。
语义为我提供了一些元组。
class Semantics(beam.DoFn):
def __init__(self, headers: list):
self.headers = headers
def process(self, element: list):
key = element[0]
value: list = [self.headers.index(element[2]), element[3]]
yield key, value
我还查看了调试器中的 AsDict 操作对象。这是一团糟,我不知道我应该如何从中提取真正的价值。有人可以帮忙吗?
DoFn class 的 __init__
部分在创建时是 运行,因此您无法获取从侧输入生成的数据。
你需要做的是将侧输入视图传递给process
方法。使用我自己的例子:
class ChangeCurrency(beam.DoFn):
def process(self, value, ratios):
current = value["currency"]
exchanged = {"Original": current}
for key in ratios[current]:
exchanged[key] = value["amount"] * ratios[current][key]
return [exchanged]
{..}
pipeline | ParDo(ChangeCurrency(), ratios=beam.pvalue.AsDict(rates_pc))
仅供参考,您不需要 class,您可以使用一个简单的函数:
def change_currency(value, ratios):
current = value["currency"]
exchanged = {"Original": current}
for key in ratios[current]:
exchanged[key] = value["amount"] * ratios[current][key]
return [exchanged]
{..}
pipeline | ParDo(change_currency, ratios=beam.pvalue.AsDict(rates_pc))
好消息,我使用的是批处理,而不是流式处理。我还是不明白这是怎么回事。显然 pvalue.AsDict() 实际上并没有给你一个字典值?只是 PCollection 的包装器。我怎么才能把它变成一个字典,这样我就可以使用它了?
此代码在 init 方法上失败,就在我尝试像访问字典一样访问操作时。
错误是
File "[path]", line 40, in process
location_indexes = [x[0] for x in self.operations["lookup_location_id"]]
TypeError: 'AsDict' object is not subscriptable [while running 'ParDo(Locations)']
罪魁祸首
class Locations(beam.DoFn): # Location_ID
def __init__(self, operations: dict):
self.locations: list = operations["lookup_location_id"]
def process(self, element: str):
# I have code here, not relevant
这是我称之为位置的地方...
locations = (
csv_data
| beam.ParDo(Locations(operations=beam.pvalue.AsDict(operations)))
| "Dedup locations" >> beam.Distinct()
)
operations 是元组的集合。这是管道:
operations = (
transforms
| ParDo(Semantics(headers))
| GroupByKey()
headers 实际上是一个普通的 ol' 列表。所以它作为 SideInput 效果很好。 语义为我提供了一些元组。
class Semantics(beam.DoFn):
def __init__(self, headers: list):
self.headers = headers
def process(self, element: list):
key = element[0]
value: list = [self.headers.index(element[2]), element[3]]
yield key, value
我还查看了调试器中的 AsDict 操作对象。这是一团糟,我不知道我应该如何从中提取真正的价值。有人可以帮忙吗?
DoFn class 的 __init__
部分在创建时是 运行,因此您无法获取从侧输入生成的数据。
你需要做的是将侧输入视图传递给process
方法。使用我自己的例子:
class ChangeCurrency(beam.DoFn):
def process(self, value, ratios):
current = value["currency"]
exchanged = {"Original": current}
for key in ratios[current]:
exchanged[key] = value["amount"] * ratios[current][key]
return [exchanged]
{..}
pipeline | ParDo(ChangeCurrency(), ratios=beam.pvalue.AsDict(rates_pc))
仅供参考,您不需要 class,您可以使用一个简单的函数:
def change_currency(value, ratios):
current = value["currency"]
exchanged = {"Original": current}
for key in ratios[current]:
exchanged[key] = value["amount"] * ratios[current][key]
return [exchanged]
{..}
pipeline | ParDo(change_currency, ratios=beam.pvalue.AsDict(rates_pc))