使用 Python Apache Beam/Dataflow 端输入时出错
Error using Python Apache Beam/Dataflow side inputs
我 运行 遇到了使用 Apache Beam 端输入的问题,尤其是 AsSingleton。代码非常简单。我正在尝试过滤订单列表以仅保留那些高于平均水平的订单,因此我计算 average_order_amount 并将其用作主要 'orders' pcollection 上过滤操作的辅助输入。但是我收到一条错误消息“TypeError:‘>’在 'float' 和 'AsSingleton' 的实例之间不受支持”。看来侧输入值不能和任何类型比较,这里是float。此外,尝试将单例类型强制转换为浮动也不起作用。要重现错误,我有此代码(错误是将订单金额 ord2[1] 与侧面输入进行比较):
import apache_beam as beam
p = beam.Pipeline()
orders = (p | beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]) )
average_order_amount = orders | 'Extract order amount' >> beam.Map(lambda ord: float(ord[1])) | 'Find mean order amount' >> beam.combiners.Mean.Globally()
above_average_orders = orders | 'Keep only high orders' >> beam.Filter(lambda ord2: ord2[1] > beam.pvalue.AsSingleton(average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()
在这种情况下,AsSingleton 对象应该以这种方式传递给您的 Filter:
import apache_beam as beam
p = beam.Pipeline()
orders = (p
| beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]))
average_order_amount = beam.pvalue.AsSingleton(
orders
| 'Extract order amount' >> beam.Map(lambda ord: float(ord[1]))
| 'Find mean order amount' >> beam.combiners.Mean.Globally())
above_average_orders = (
orders
| 'Keep only high orders' >> beam.Filter(lambda ord2, avg: ord2[1] > avg,
average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()
您将 AsSingleton
对象作为 Filter
函数的第二个参数传递,并为 lambda
添加一个额外的参数,Beam 将适当地填充该参数。
其他变换也是如此,如Map
、FlatMap
、ParDo
等
我 运行 遇到了使用 Apache Beam 端输入的问题,尤其是 AsSingleton。代码非常简单。我正在尝试过滤订单列表以仅保留那些高于平均水平的订单,因此我计算 average_order_amount 并将其用作主要 'orders' pcollection 上过滤操作的辅助输入。但是我收到一条错误消息“TypeError:‘>’在 'float' 和 'AsSingleton' 的实例之间不受支持”。看来侧输入值不能和任何类型比较,这里是float。此外,尝试将单例类型强制转换为浮动也不起作用。要重现错误,我有此代码(错误是将订单金额 ord2[1] 与侧面输入进行比较):
import apache_beam as beam
p = beam.Pipeline()
orders = (p | beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]) )
average_order_amount = orders | 'Extract order amount' >> beam.Map(lambda ord: float(ord[1])) | 'Find mean order amount' >> beam.combiners.Mean.Globally()
above_average_orders = orders | 'Keep only high orders' >> beam.Filter(lambda ord2: ord2[1] > beam.pvalue.AsSingleton(average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()
在这种情况下,AsSingleton 对象应该以这种方式传递给您的 Filter:
import apache_beam as beam
p = beam.Pipeline()
orders = (p
| beam.Create([(1, 12.6), (2, 8.7), (3, 41.2), (4, 15.0)]))
average_order_amount = beam.pvalue.AsSingleton(
orders
| 'Extract order amount' >> beam.Map(lambda ord: float(ord[1]))
| 'Find mean order amount' >> beam.combiners.Mean.Globally())
above_average_orders = (
orders
| 'Keep only high orders' >> beam.Filter(lambda ord2, avg: ord2[1] > avg,
average_order_amount))
above_average_orders | 'Write results to file' >> beam.io.WriteToText('/tmp/high_orders')
p.run()
您将 AsSingleton
对象作为 Filter
函数的第二个参数传递,并为 lambda
添加一个额外的参数,Beam 将适当地填充该参数。
其他变换也是如此,如Map
、FlatMap
、ParDo
等