如何从 Google 数据流中的 PCollection 中获取元素列表并在管道中使用它来循环写入转换?
How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?
我正在使用 Google Cloud Dataflow 和 Python SDK。
我愿意:
- 从主 PCollection 中获取唯一日期列表
- 遍历该列表中的日期以创建过滤的 PCollection(每个都有一个唯一的日期),并将每个过滤的 PCollection 写入其在 BigQuery 中按时间分区 table 的分区。
我怎样才能得到那个名单?在以下组合转换之后,我创建了一个 ListPCollectionView 对象,但我无法迭代该对象:
class ToUniqueList(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, element):
if element not in accumulator:
accumulator.append(element)
return accumulator
def merge_accumulators(self, accumulators):
return list(set(accumulators))
def extract_output(self, accumulator):
return accumulator
def get_list_of_dates(pcoll):
return (pcoll
| 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
我做错了吗?最好的方法是什么?
谢谢。
无法直接获取 PCollection
的内容 - Apache Beam 或 Dataflow 管道更像是应该进行哪些处理的查询计划,PCollection
是一个逻辑计划中的中间节点,而不是包含数据。主程序组装计划(管道)并启动它。
但是,最终您要尝试将数据写入按日期分片的 BigQuery 表。目前仅 in the Java SDK 支持此用例,并且仅适用于流式传输管道。
有关根据数据将数据写入多个目的地的更一般处理,请遵循 BEAM-92。
另见
我正在使用 Google Cloud Dataflow 和 Python SDK。
我愿意:
- 从主 PCollection 中获取唯一日期列表
- 遍历该列表中的日期以创建过滤的 PCollection(每个都有一个唯一的日期),并将每个过滤的 PCollection 写入其在 BigQuery 中按时间分区 table 的分区。
我怎样才能得到那个名单?在以下组合转换之后,我创建了一个 ListPCollectionView 对象,但我无法迭代该对象:
class ToUniqueList(beam.CombineFn):
def create_accumulator(self):
return []
def add_input(self, accumulator, element):
if element not in accumulator:
accumulator.append(element)
return accumulator
def merge_accumulators(self, accumulators):
return list(set(accumulators))
def extract_output(self, accumulator):
return accumulator
def get_list_of_dates(pcoll):
return (pcoll
| 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
我做错了吗?最好的方法是什么?
谢谢。
无法直接获取 PCollection
的内容 - Apache Beam 或 Dataflow 管道更像是应该进行哪些处理的查询计划,PCollection
是一个逻辑计划中的中间节点,而不是包含数据。主程序组装计划(管道)并启动它。
但是,最终您要尝试将数据写入按日期分片的 BigQuery 表。目前仅 in the Java SDK 支持此用例,并且仅适用于流式传输管道。
有关根据数据将数据写入多个目的地的更一般处理,请遵循 BEAM-92。
另见