Apache beam:以编程方式创建分区表
Apache beam : Programatically create partitioned tables
我正在编写一个云数据流,它从 Pubsub 读取消息并将其存储到 BigQuery 中。我想使用分区 table(按日期),并且我正在使用与消息关联的 Timestamp
来确定消息应该进入哪个分区。下面是我的代码:
BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
log.info("Row value : {}", value.getValue());
Instant timestamp = value.getTimestamp();
String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
TableDestination td = new TableDestination(
"<project>:<dataset>.<table>" + "$" + partition, null);
log.info("Table Destination : {}", td);
return td;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchema(tableSchema);
当我部署数据流时,我可以在 Stackdriver 中看到日志语句,但是,消息没有插入到 BigQuery tables 中,我收到以下错误:
Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity: "WARNING"
因此,看起来无法创建 table,导致插入失败。我是否需要更改数据流定义才能使其正常工作?如果没有,是否有任何其他方法以编程方式创建分区 tables?
我正在使用 Apache Beam 2.0.0。
这是 a bug in BigQueryIO 并且已在 Beam 2.2 中修复。您可以使用 Beam 的快照版本,或者等到 2.2 版最终确定(发布过程目前正在进行中)。
我正在编写一个云数据流,它从 Pubsub 读取消息并将其存储到 BigQuery 中。我想使用分区 table(按日期),并且我正在使用与消息关联的 Timestamp
来确定消息应该进入哪个分区。下面是我的代码:
BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
private static final long serialVersionUID = 1L;
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
log.info("Row value : {}", value.getValue());
Instant timestamp = value.getTimestamp();
String partition = DateTimeFormat.forPattern("yyyyMMdd").print(timestamp);
TableDestination td = new TableDestination(
"<project>:<dataset>.<table>" + "$" + partition, null);
log.info("Table Destination : {}", td);
return td;
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withSchema(tableSchema);
当我部署数据流时,我可以在 Stackdriver 中看到日志语句,但是,消息没有插入到 BigQuery tables 中,我收到以下错误:
Request failed with code 400, will NOT retry: https://www.googleapis.com/bigquery/v2/projects/<project_id>/datasets/<dataset_id>/tables
severity: "WARNING"
因此,看起来无法创建 table,导致插入失败。我是否需要更改数据流定义才能使其正常工作?如果没有,是否有任何其他方法以编程方式创建分区 tables?
我正在使用 Apache Beam 2.0.0。
这是 a bug in BigQueryIO 并且已在 Beam 2.2 中修复。您可以使用 Beam 的快照版本,或者等到 2.2 版最终确定(发布过程目前正在进行中)。