需要将 Collection 转换为 Tablerow 的通用格式
Need generic format for converting Collection to Tablerow
我正在通过从存储桶中读取 CSV 文件并存储在 Big Query 中来进行转换
PCollection quotes = ....//读取数据并做转换
//写入 BQ 现有 table 有 2 列 "source" 和 "quote".
quotes.apply(
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote)))
.apply(
BigQueryIO.writeTableRows()
.to(tableSpecname)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
我需要替换将 PCollection 转换为 TableRow 的代码,因为在某些情况下 table 列可能会有所不同,因此这个核心列名称将不起作用。
您只需在输入 PCollection
和 BigQuery 写入步骤之间添加 ParDo
步骤,然后添加 DoFn
class 将数据格式化为 TableRow
以您想要的方式反对。
https://beam.apache.org/documentation/programming-guide/#pardo
我正在通过从存储桶中读取 CSV 文件并存储在 Big Query 中来进行转换
PCollection quotes = ....//读取数据并做转换
//写入 BQ 现有 table 有 2 列 "source" 和 "quote".
quotes.apply(
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(
(Quote elem) ->
new TableRow().set("source", elem.source).set("quote", elem.quote)))
.apply(
BigQueryIO.writeTableRows()
.to(tableSpecname)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
我需要替换将 PCollection 转换为 TableRow 的代码,因为在某些情况下 table 列可能会有所不同,因此这个核心列名称将不起作用。
您只需在输入 PCollection
和 BigQuery 写入步骤之间添加 ParDo
步骤,然后添加 DoFn
class 将数据格式化为 TableRow
以您想要的方式反对。
https://beam.apache.org/documentation/programming-guide/#pardo