Apache Beam - 在另一个聚合中使用一个聚合的输出
Apache Beam - use output of one aggregation in another aggregation
我是 Apache Beam 的新手,来自 PySpark 的数据框 API。我无法在另一个光束计算中使用一个光束计算的输出。基本上,我想执行一个产生一个值(例如平均值)的聚合,并在后续聚合中将此聚合的结果用作 python 基元(例如浮点数)。例如:
import apache_beam as beam
DATA = [
beam.Row(val="hello"),
beam.Row(val="Whosebug,"),
beam.Row(val="plz"),
beam.Row(val="halp"),
]
with beam.Pipeline() as pipe:
graph = pipe | beam.Create(DATA)
average_word_length = (graph
| "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
| "Compute mean" >> beam.combiners.Mean.Globally()
| "Print avg" >> beam.Map(print)
)
# average_word_lengths is a PCollection with only one value: 6.5
(graph
| "Compute metric" >> beam.ParDo(lambda row: beam.Row(newval=len(row.val)/average_word_length)) # fails here
| beam.Map(print)
)
这失败并出现 TypeError,因为我试图将一个 int 除以一个 PCollection...有没有办法从 average_word_length
PCollection 中提取一个浮点值并将其用作浮点数下一次聚合?如果没有,我如何实现类似的目标?
您需要的是 Side Input
。这是关于他们的programming guide。
工作解决方案(请注意,我删除了其中一张打印件,以便那里有实际输出):
import apache_beam as beam
from apache_beam import pvalue
DATA = [
beam.Row(val="HOPE"),
beam.Row(val="THIS,"),
beam.Row(val="WORKS"),
beam.Row(val="BLABLABLA"),
]
with beam.Pipeline() as pipe:
graph = pipe | beam.Create(DATA)
average_word_length = (graph
| "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
| "Compute mean" >> beam.combiners.Mean.Globally()
)
# average_word_lengths is a PCollection with only one value: 6.5
(graph
| "Compute metric" >> beam.ParDo(lambda row, side: beam.Row(newval=len(row.val)/side),
side=pvalue.AsSingleton(average_word_length)) # fails here
| beam.Map(print)
)
我是 Apache Beam 的新手,来自 PySpark 的数据框 API。我无法在另一个光束计算中使用一个光束计算的输出。基本上,我想执行一个产生一个值(例如平均值)的聚合,并在后续聚合中将此聚合的结果用作 python 基元(例如浮点数)。例如:
import apache_beam as beam
DATA = [
beam.Row(val="hello"),
beam.Row(val="Whosebug,"),
beam.Row(val="plz"),
beam.Row(val="halp"),
]
with beam.Pipeline() as pipe:
graph = pipe | beam.Create(DATA)
average_word_length = (graph
| "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
| "Compute mean" >> beam.combiners.Mean.Globally()
| "Print avg" >> beam.Map(print)
)
# average_word_lengths is a PCollection with only one value: 6.5
(graph
| "Compute metric" >> beam.ParDo(lambda row: beam.Row(newval=len(row.val)/average_word_length)) # fails here
| beam.Map(print)
)
这失败并出现 TypeError,因为我试图将一个 int 除以一个 PCollection...有没有办法从 average_word_length
PCollection 中提取一个浮点值并将其用作浮点数下一次聚合?如果没有,我如何实现类似的目标?
您需要的是 Side Input
。这是关于他们的programming guide。
工作解决方案(请注意,我删除了其中一张打印件,以便那里有实际输出):
import apache_beam as beam
from apache_beam import pvalue
DATA = [
beam.Row(val="HOPE"),
beam.Row(val="THIS,"),
beam.Row(val="WORKS"),
beam.Row(val="BLABLABLA"),
]
with beam.Pipeline() as pipe:
graph = pipe | beam.Create(DATA)
average_word_length = (graph
| "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
| "Compute mean" >> beam.combiners.Mean.Globally()
)
# average_word_lengths is a PCollection with only one value: 6.5
(graph
| "Compute metric" >> beam.ParDo(lambda row, side: beam.Row(newval=len(row.val)/side),
side=pvalue.AsSingleton(average_word_length)) # fails here
| beam.Map(print)
)