Apache Beam - 在另一个聚合中使用一个聚合的输出

Apache Beam - use output of one aggregation in another aggregation

我是 Apache Beam 的新手,来自 PySpark 的数据框 API。我无法在另一个光束计算中使用一个光束计算的输出。基本上,我想执行一个产生一个值(例如平均值)的聚合,并在后续聚合中将此聚合的结果用作 python 基元(例如浮点数)。例如:

import apache_beam as beam

DATA = [
  beam.Row(val="hello"),
  beam.Row(val="Whosebug,"),
  beam.Row(val="plz"),
  beam.Row(val="halp"),
]

with beam.Pipeline() as pipe:
  graph = pipe | beam.Create(DATA)
  average_word_length = (graph 
    | "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
    | "Compute mean" >> beam.combiners.Mean.Globally()
    | "Print avg" >> beam.Map(print)
  )
  # average_word_lengths is a PCollection with only one value: 6.5

  (graph 
    | "Compute metric" >> beam.ParDo(lambda row: beam.Row(newval=len(row.val)/average_word_length)) # fails here
    | beam.Map(print)
  )

这失败并出现 TypeError,因为我试图将一个 int 除以一个 PCollection...有没有办法从 average_word_length PCollection 中提取一个浮点值并将其用作浮点数下一次聚合?如果没有,我如何实现类似的目标?

您需要的是 Side Input。这是关于他们的programming guide

工作解决方案(请注意,我删除了其中一张打印件,以便那里有实际输出):

import apache_beam as beam
from apache_beam import pvalue


DATA = [
  beam.Row(val="HOPE"),
  beam.Row(val="THIS,"),
  beam.Row(val="WORKS"),
  beam.Row(val="BLABLABLA"),
]

with beam.Pipeline() as pipe:
  graph = pipe | beam.Create(DATA)
  average_word_length = (graph 
    | "Get lengths" >> beam.ParDo(lambda row: beam.Row(length=len(row.val)))
    | "Compute mean" >> beam.combiners.Mean.Globally()
  )
  # average_word_lengths is a PCollection with only one value: 6.5

  (graph 
    | "Compute metric" >> beam.ParDo(lambda row, side: beam.Row(newval=len(row.val)/side), 
                                                          side=pvalue.AsSingleton(average_word_length)) # fails here
    | beam.Map(print)
  )