Apache-Beam 将序列号添加到 PCollection

Apache-Beam add sequence number to a PCollection

我正在尝试构建一个 ETL 来加载维度 table。我使用 Apache Bea,使用 Python、DataFlow 和 BigQuery。

我需要为 pcollection 的每个元素分配一个序列号,以便将其加载到 BigQuery 中,但我找不到任何方法来执行此操作。

我认为我需要 DataFlow 来进行先前的聚合和连接以获得我的最终 pcollection 以添加序列号,但此时我需要停止并行处理并将我的 pcollection 转换为列表(如在 Spark 中时你使用 .collect()) 然后做一个简单的循环来分配序列号。对吗?

这是我编写的管道:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

我了解到无法从 pcollection 获取列表:

如何实现?有帮助吗?

如果您想要获得包含 PCollection 中每个元素的列表,您可以使用辅助输入。请记住,这将从您的结果中删除所有并行性,并且您的管道可能会变慢。

如果您仍想这样做,那么:

side_input_coll = beam.pvalue.AsIterable(my_collection)

(p 
 | beam.Create([0]) 
 | beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)],
               my_seq=side_input_coll))

但是不要忘记,为了保持并行性,最好只是生成一个随机 ID。请记住 PCollections 本质上是无序的。

要了解有关侧输入的更多信息,请参阅 Beam Programming Guide on Side Inputs