如何在 Python 中为 Apache Beam 提供有用的辅助输入? AsDict 对象不可订阅?

How do I make a useful side input I can access for Apache Beam in Python? AsDict object not subscriptable?

好消息,我使用的是批处理,而不是流式处理。我还是不明白这是怎么回事。显然 pvalue.AsDict() 实际上并没有给你一个字典值?只是 PCollection 的包装器。我怎么才能把它变成一个字典,这样我就可以使用它了?

此代码在 init 方法上失败,就在我尝试像访问字典一样访问操作时。

错误是

File "[path]", line 40, in process
    location_indexes = [x[0] for x in self.operations["lookup_location_id"]]
TypeError: 'AsDict' object is not subscriptable [while running 'ParDo(Locations)']

罪魁祸首

class Locations(beam.DoFn):  # Location_ID
    def __init__(self, operations: dict):
        self.locations: list = operations["lookup_location_id"]

    def process(self, element: str):
        # I have code here, not relevant

这是我称之为位置的地方...

locations = (
            csv_data
            | beam.ParDo(Locations(operations=beam.pvalue.AsDict(operations)))
            | "Dedup locations" >> beam.Distinct()
        )

operations 是元组的集合。这是管道:

operations = (
            transforms
            | ParDo(Semantics(headers))
            | GroupByKey()

headers 实际上是一个普通的 ol' 列表。所以它作为 SideInput 效果很好。 语义为我提供了一些元组。

class Semantics(beam.DoFn):
    def __init__(self, headers: list):
        self.headers = headers

    def process(self, element: list):
        key = element[0]
        value: list = [self.headers.index(element[2]), element[3]]
        yield key, value

我还查看了调试器中的 AsDict 操作对象。这是一团糟,我不知道我应该如何从中提取真正的价值。有人可以帮忙吗?

DoFn class 的 __init__ 部分在创建时是 运行,因此您无法获取从侧输入生成的数据。

你需要做的是将侧输入视图传递给process方法。使用我自己的例子:

class ChangeCurrency(beam.DoFn):    
    def process(self, value, ratios):
        current = value["currency"]
        exchanged = {"Original": current}
        for key in ratios[current]:
            exchanged[key] = value["amount"] * ratios[current][key]
        return [exchanged]  


{..}
pipeline | ParDo(ChangeCurrency(), ratios=beam.pvalue.AsDict(rates_pc))

仅供参考,您不需要 class,您可以使用一个简单的函数:

def change_currency(value, ratios):
    current = value["currency"]
    exchanged = {"Original": current}
    for key in ratios[current]:
        exchanged[key] = value["amount"] * ratios[current][key]
    return [exchanged]

{..}

pipeline | ParDo(change_currency, ratios=beam.pvalue.AsDict(rates_pc))