需要将 Collection 转换为 Tablerow 的通用格式

Need generic format for converting Collection to Tablerow

我正在通过从存储桶中读取 CSV 文件并存储在 Big Query 中来进行转换

PCollection quotes = ....//读取数据并做转换

//写入 BQ 现有 table 有 2 列 "source" 和 "quote".

quotes.apply(
                MapElements.into(TypeDescriptor.of(TableRow.class))
                    .via(
                        (Quote elem) ->
                            new TableRow().set("source", elem.source).set("quote", elem.quote)))
            .apply(
                BigQueryIO.writeTableRows()
                    .to(tableSpecname)
                    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

我需要替换将 PCollection 转换为 TableRow 的代码,因为在某些情况下 table 列可能会有所不同,因此这个核心列名称将不起作用。

您只需在输入 PCollection 和 BigQuery 写入步骤之间添加 ParDo 步骤,然后添加 DoFn class 将数据格式化为 TableRow 以您想要的方式反对。

https://beam.apache.org/documentation/programming-guide/#pardo