如何在 Google Dataflow 中添加一列以查询结果
How to add a column to query results in Google Dataflow
我正在尝试从 BigQuery 读取查询,然后使用 Kotlin 中的 Apache Beam / Dataflow 我想添加一个包含当前日期作为时间戳的列。我不想在查询本身内部执行此操作,因为我想将这段代码重复用于大量查询,而且它看起来是一个更好的设计。
这是我写的流水线代码:
val pipeline = Pipeline.create(options)
.apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply("Add date", ParDo.of(AddDate()))
.apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))
出于某种原因,它没有从 Add date
转换中前进。
这是最有可能出现错误/错误的代码:
class AddDate : DoFn<TableRow, TableRow>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val tableRow = context.element() as TableRow
tableRow.set("process_date", Instant.now())
context.output(tableRow)
}
}
我也在 processElement
中尝试使用此代码,但仍然无效。
context.outputWithTimestamp(context.element(), Instant.now())
错误如下:
Input values must not be mutated in any way.
问题已通过使用新对象并注意用于日期的类型(对于 DATE
或 TIMESTAMP
类型)得到解决
@ProcessElement
fun processElement(context: ProcessContext) {
val tableRow = TableRow()
tableRow.set("process_date", Instant.now().toString())
val input = context.element() as TableRow
input.keys.forEach { tableRow.set(it, input[it]) }
context.output(tableRow)
}
我正在尝试从 BigQuery 读取查询,然后使用 Kotlin 中的 Apache Beam / Dataflow 我想添加一个包含当前日期作为时间戳的列。我不想在查询本身内部执行此操作,因为我想将这段代码重复用于大量查询,而且它看起来是一个更好的设计。
这是我写的流水线代码:
val pipeline = Pipeline.create(options)
.apply("Retrieve query", BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply("Add date", ParDo.of(AddDate()))
.apply("Store data", BigQueryIO.writeTableRows().withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.to(TableReference().setProjectId(gcpProject).setDatasetId(datasetId).setTableId(tableId))
出于某种原因,它没有从 Add date
转换中前进。
这是最有可能出现错误/错误的代码:
class AddDate : DoFn<TableRow, TableRow>() {
@ProcessElement
fun processElement(context: ProcessContext) {
val tableRow = context.element() as TableRow
tableRow.set("process_date", Instant.now())
context.output(tableRow)
}
}
我也在 processElement
中尝试使用此代码,但仍然无效。
context.outputWithTimestamp(context.element(), Instant.now())
错误如下:
Input values must not be mutated in any way.
问题已通过使用新对象并注意用于日期的类型(对于 DATE
或 TIMESTAMP
类型)得到解决
@ProcessElement
fun processElement(context: ProcessContext) {
val tableRow = TableRow()
tableRow.set("process_date", Instant.now().toString())
val input = context.element() as TableRow
input.keys.forEach { tableRow.set(it, input[it]) }
context.output(tableRow)
}