展平 (key, List[List]) PCollection 中的列表列表,保留密钥
flatten list of lists in (key, List[List]) PCollection, retaining the key
我有一个(键,值)对的 PCollection,其中值本身就是一个列表
{'apple': ['1','2'],
'watermelon': ['4','5']
'apple': ['9','10']
}
我想获得另一个具有相同键的集合,但列表是原始集合的 GroupByKey
,但结果值是扁平化的。
{'apple': ['1','2','9','10'],
'watermelon': ['4','5']
}
而不是
{'apple': [['1','2], ['9','10']],
'watermelon': [['4','5']]
}
我已经尝试了一些方法,但我不清楚如何去做。
我想人们总是可以编写一个 ParDo
函数来在每个元素的基础上展平列表,但我觉得必须有一个更简单的解决方案。
您正在寻找的逻辑操作是“按键组合”,其中组合器是列表串联(在 Python 中称为 +
)。
根据数据的特性,您可能希望使用新的 CombineFn
来实现此操作,该操作会改变列表作为其累加器。这几乎等同于执行 GroupByKey
,然后在 ParDo
.
中展平列表
执行上的区别在于 Combine
操作可以在打乱数据之前执行,因为 Beam runner 知道它是关联和交换操作。这样的话,不会减少shuffle的数据量,所以不是那么重要。
你在找这样的东西吗
import apache_beam as beam
class Combiner(beam.CombineFn):
def create_accumulator(self, *args, **kwargs):
return {}
def add_input(self, acc, element):
key = element[0]
value = element[1]
if key in acc:
value.extend(acc[key])
acc[key]=value
return acc
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, accumulator):
return accumulator
with beam.Pipeline() as p:
pipe =(
p
|'create'>> beam.Create(
[
('apple', ['1','2']),
('watermelon', ['4','5']),
('apple', ['9','10']),
('watermelon', ['49','50'])
]
)
|'combiner'>>beam.CombineGlobally(Combiner())
|'print'>>beam.Map(print)
)
#o/p - [{'apple': ['9', '10', '1', '2'], 'watermelon': ['49', '50', '4', '5']}]
我有一个(键,值)对的 PCollection,其中值本身就是一个列表
{'apple': ['1','2'],
'watermelon': ['4','5']
'apple': ['9','10']
}
我想获得另一个具有相同键的集合,但列表是原始集合的 GroupByKey
,但结果值是扁平化的。
{'apple': ['1','2','9','10'],
'watermelon': ['4','5']
}
而不是
{'apple': [['1','2], ['9','10']],
'watermelon': [['4','5']]
}
我已经尝试了一些方法,但我不清楚如何去做。
我想人们总是可以编写一个 ParDo
函数来在每个元素的基础上展平列表,但我觉得必须有一个更简单的解决方案。
您正在寻找的逻辑操作是“按键组合”,其中组合器是列表串联(在 Python 中称为 +
)。
根据数据的特性,您可能希望使用新的 CombineFn
来实现此操作,该操作会改变列表作为其累加器。这几乎等同于执行 GroupByKey
,然后在 ParDo
.
执行上的区别在于 Combine
操作可以在打乱数据之前执行,因为 Beam runner 知道它是关联和交换操作。这样的话,不会减少shuffle的数据量,所以不是那么重要。
你在找这样的东西吗
import apache_beam as beam
class Combiner(beam.CombineFn):
def create_accumulator(self, *args, **kwargs):
return {}
def add_input(self, acc, element):
key = element[0]
value = element[1]
if key in acc:
value.extend(acc[key])
acc[key]=value
return acc
def merge_accumulators(self, accumulators):
return accumulators
def extract_output(self, accumulator):
return accumulator
with beam.Pipeline() as p:
pipe =(
p
|'create'>> beam.Create(
[
('apple', ['1','2']),
('watermelon', ['4','5']),
('apple', ['9','10']),
('watermelon', ['49','50'])
]
)
|'combiner'>>beam.CombineGlobally(Combiner())
|'print'>>beam.Map(print)
)
#o/p - [{'apple': ['9', '10', '1', '2'], 'watermelon': ['49', '50', '4', '5']}]