beam.dataframe.io.read_fwf 的数据流:缺少 Ptransforms

Dataflow with beam.dataframe.io.read_fwf : missing Ptransforms

我遇到了问题。我不知道如何修复它。

我有一个批处理模式的管道。我使用 read_fwf 方法读取文件:beam.dataframe.io.read_fwf;但是,后面的所有 PTransform 都将被忽略。我想知道为什么?

如您所见,我的管道最终有 1 个步骤:

但是,我的代码有以下管道:

#LOAD FILE RNS
elements = p | 'Load File' >> beam.dataframe.io.read_fwf('gs://my_bucket/files/202009.txt', header=None, colspec=col_definition, dtype=str, keep_default_na=False, encoding='ISO-8859-1')
  
#PREPARE VALUES (BULK INSERT)
Script_Values = elements | 'Prepare Bulk Insert' >> beam.ParDo(Prepare_Bulk_Insert())  
        
#GROUP ALL VALUES
Grouped_Values = Script_Values | 'Grouping values' >> beam.GroupByKey()

#BULK INSERT INTO POSTGRESQL GROUPING BASED
Inserted = Grouped_Values | 'Insert PostgreSQL' >> beam.Map(functionMapInsert)  

你知道我做错了什么吗?

此致, 朱利安诺

我认为问题与以下事实有关,正如您在代表文件内容的 Apache Beam documentation, beam.dataframe.io.read_fwf returns a deferred Beam dataframe 中看到的那样,而不是 PCollection.

您可以将 DataFrames 嵌入到管道中,并借助 apache_beam.dataframe.convert 模块中定义的函数在它们与 PCollection 之间进行转换。

SDK文档提供an example of this setup, fully described in Github.

我认为尝试 DataframeTransform as well, perhaps it is more suitable for being integrated in the pipeline with the help of a schema 定义是值得的。

关于最后一条建议,请考虑查看相关的 , especially the answer from @robertwb and the exceptional linked google slides document,我认为它会有所帮助。