展平 (key, List[List]) PCollection 中的列表列表,保留密钥

flatten list of lists in (key, List[List]) PCollection, retaining the key

我有一个(键,值)对的 PCollection,其中值本身就是一个列表

{'apple': ['1','2'],
 'watermelon': ['4','5']
 'apple': ['9','10']
 }

我想获得另一个具有相同键的集合,但列表是原始集合的 GroupByKey,但结果值是扁平化的。

{'apple': ['1','2','9','10'],
 'watermelon': ['4','5']
 }

而不是

{'apple': [['1','2], ['9','10']],
 'watermelon': [['4','5']]
 }

我已经尝试了一些方法,但我不清楚如何去做。

我想人们总是可以编写一个 ParDo 函数来在每个元素的基础上展平列表,但我觉得必须有一个更简单的解决方案。

您正在寻找的逻辑操作是“按键组合”,其中组合器是列表串联(在 Python 中称为 +)。

根据数据的特性,您可能希望使用新的 CombineFn 来实现此操作,该操作会改变列表作为其累加器。这几乎等同于执行 GroupByKey,然后在 ParDo.

中展平列表

执行上的区别在于 Combine 操作可以在打乱数据之前执行,因为 Beam runner 知道它是关联和交换操作。这样的话,不会减少shuffle的数据量,所以不是那么重要。

你在找这样的东西吗

import apache_beam as beam


class Combiner(beam.CombineFn):

    def create_accumulator(self, *args, **kwargs):
        return {}
    
    def add_input(self, acc, element):
        key = element[0]
        value = element[1]
        if key in acc:
            value.extend(acc[key])
        acc[key]=value
        return acc
    
    def merge_accumulators(self, accumulators):
        return accumulators

    def extract_output(self, accumulator):
        return accumulator

with beam.Pipeline() as p:
    pipe =(
        p
          |'create'>> beam.Create(
                
                [
                ('apple', ['1','2']),
                ('watermelon', ['4','5']),
                ('apple', ['9','10']),
                ('watermelon', ['49','50'])
                ]
                
 )
          |'combiner'>>beam.CombineGlobally(Combiner())
          |'print'>>beam.Map(print)
             
    )

#o/p - [{'apple': ['9', '10', '1', '2'], 'watermelon': ['49', '50', '4', '5']}]