将 Apache Beam PCollection 一分为二的速度和内存权衡
Speed and memory tradeoffs splitting Apache Beam PCollection in two
我有一个 PCollection,其中每个元素都是一个键值元组,如下所示:(key, (value1,..,value_n) )
我需要将这个 PCollection 分成两个处理分支。
一如既往,我需要整个管道尽可能快并使用尽可能少的 ram。
我想到了两个想法:
选项 1:使用具有多个输出的 DoFn 拆分 PColl
class SplitInTwo(beam.DoFn):
def process(self, kvpair):
key, values = kvpair
yield beam.TaggedOutput('left', (key, values[0:2]))
yield beam.TaggedOutput('right', (key, values[2:]))
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key,values = kvpair
...
yield (key, results)
# class ProcessRight is similar to ProcessLeft
然后像这样构建管道
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.ParDo(ProcessLeft())
right = splitme.right | beam.ParDo(ProcessRight())
选项 2:在原始 PCollection 上使用两个不同的 DoFn
另一种选择是使用两个 DoFns 来读取和处理同一个 PCollection。
仅对数据的 'left' 和 'right' 手边使用一个:
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key = kvpair[0]
values = kvpair[0][0:2]
...
yield (key,result)
# class ProcessRight is similar to ProcessLeft
构建流水线更简单...(而且您不需要跟踪您拥有哪些标记输出):
left = pcoll | beam.ParDo(ProcessLeft())
right = pcoll| beam.ParDo(ProcessRight())
但是...它更快吗?需要的内存会比第一个少吗?
(我在考虑第一个选项可能会被运行器融合——而不仅仅是数据流运行器)。
在这种情况下,两个选项都将由跑步者融合,因此两个选项在性能方面会有些相似。如果你想重新洗牌数据到单独的worker中,那么选项1是你最好的选择,因为ProcessLeft
读取的序列化集合ProcessRight
会更小。
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())
Reshuffle
转换将确保您的数据写入中间洗牌,然后在下游使用。这会破坏融合。
我有一个 PCollection,其中每个元素都是一个键值元组,如下所示:(key, (value1,..,value_n) )
我需要将这个 PCollection 分成两个处理分支。
一如既往,我需要整个管道尽可能快并使用尽可能少的 ram。
我想到了两个想法:
选项 1:使用具有多个输出的 DoFn 拆分 PColl
class SplitInTwo(beam.DoFn):
def process(self, kvpair):
key, values = kvpair
yield beam.TaggedOutput('left', (key, values[0:2]))
yield beam.TaggedOutput('right', (key, values[2:]))
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key,values = kvpair
...
yield (key, results)
# class ProcessRight is similar to ProcessLeft
然后像这样构建管道
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.ParDo(ProcessLeft())
right = splitme.right | beam.ParDo(ProcessRight())
选项 2:在原始 PCollection 上使用两个不同的 DoFn
另一种选择是使用两个 DoFns 来读取和处理同一个 PCollection。 仅对数据的 'left' 和 'right' 手边使用一个:
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key = kvpair[0]
values = kvpair[0][0:2]
...
yield (key,result)
# class ProcessRight is similar to ProcessLeft
构建流水线更简单...(而且您不需要跟踪您拥有哪些标记输出):
left = pcoll | beam.ParDo(ProcessLeft())
right = pcoll| beam.ParDo(ProcessRight())
但是...它更快吗?需要的内存会比第一个少吗?
(我在考虑第一个选项可能会被运行器融合——而不仅仅是数据流运行器)。
在这种情况下,两个选项都将由跑步者融合,因此两个选项在性能方面会有些相似。如果你想重新洗牌数据到单独的worker中,那么选项1是你最好的选择,因为ProcessLeft
读取的序列化集合ProcessRight
会更小。
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())
Reshuffle
转换将确保您的数据写入中间洗牌,然后在下游使用。这会破坏融合。