如何在 Google Dataflow 中添加一列以查询结果

How to add a column to query results in Google Dataflow

我正在尝试从 BigQuery 读取查询,然后使用 Kotlin 中的 Apache Beam / Dataflow 我想添加一个包含当前日期作为时间戳的列。我不想在查询本身内部执行此操作,因为我想将这段代码重复用于大量查询,而且它看起来是一个更好的设计。

这是我写的流水线代码:

val pipeline = Pipeline.create(options)
  .apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
  .apply("Add date", ParDo.of(AddDate()))
  .apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
    .to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))

出于某种原因,它没有从 Add date 转换中前进。 这是最有可能出现错误/错误的代码:

class AddDate : DoFn<TableRow, TableRow>() {

    @ProcessElement
    fun processElement(context: ProcessContext) {
        val tableRow = context.element() as TableRow
        tableRow.set("process_date", Instant.now())
        context.output(tableRow)
    }
}

我也在 processElement 中尝试使用此代码,但仍然无效。

context.outputWithTimestamp(context.element(), Instant.now())

错误如下:

Input values must not be mutated in any way.

问题已通过使用新对象并注意用于日期的类型(对于 DATETIMESTAMP 类型)得到解决

@ProcessElement
fun processElement(context: ProcessContext) {
    val tableRow = TableRow()
    tableRow.set("process_date", Instant.now().toString())
    val input = context.element() as TableRow
    input.keys.forEach { tableRow.set(it, input[it]) }
    context.output(tableRow)
}