使用 Python Apache Beam/Dataflow 端输入时出错

Error using Python Apache Beam/Dataflow side inputs

我 运行 遇到了使用 Apache Beam 端输入的问题,尤其是 AsSingleton。代码非常简单。我正在尝试过滤订单列表以仅保留那些高于平均水平的订单,因此我计算 average_order_amount 并将其用作主要 'orders' pcollection 上过滤操作的辅助输入。但是我收到一条错误消息“TypeError:‘>’在 'float' 和 'AsSingleton' 的实例之间不受支持”。看来侧输入值不能和任何类型比较,这里是float。此外,尝试将单例类型强制转换为浮动也不起作用。要重现错误,我有此代码(错误是将订单金额 ord2[1] 与侧面输入进行比较):

import apache_beam as beam
p = beam.Pipeline()
orders = (p | beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]) )
average_order_amount = orders | 'Extract order amount' >>  beam.Map(lambda ord: float(ord[1])) | 'Find mean order amount' >> beam.combiners.Mean.Globally()
above_average_orders = orders | 'Keep only high orders' >> beam.Filter(lambda ord2: ord2[1] > beam.pvalue.AsSingleton(average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()

在这种情况下,AsSingleton 对象应该以这种方式传递给您的 Filter:

import apache_beam as beam
p = beam.Pipeline()
orders = (p 
          | beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]))
average_order_amount = beam.pvalue.AsSingleton(
    orders 
    | 'Extract order amount' >>  beam.Map(lambda ord: float(ord[1])) 
    | 'Find mean order amount' >> beam.combiners.Mean.Globally())

above_average_orders = (
    orders 
    | 'Keep only high orders' >> beam.Filter(lambda ord2, avg: ord2[1] > avg, 
                                             average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()

您将 AsSingleton 对象作为 Filter 函数的第二个参数传递,并为 lambda 添加一个额外的参数,Beam 将适当地填充该参数。

其他变换也是如此,如MapFlatMapParDo