Creating/Writing 通过 Google Cloud Dataflow table 分片(过时)BigQuery
Creating/Writing to Sharded (Dated) BigQuery table via Google Cloud Dataflow
是否有一个简单易懂的示例如何配置流模式数据流管道以将每个 window 写入单独的 BigQuery table(并在必要时创建一个)?
即- table_20160701、table_20160702 等
示例代码:
`
PCollection<TableRow> quotes =
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) window).start());
return "my-project:output.output_table_" + dayString;
}
}));
}
`
取自此处:
是否有一个简单易懂的示例如何配置流模式数据流管道以将每个 window 写入单独的 BigQuery table(并在必要时创建一个)?
即- table_20160701、table_20160702 等
示例代码:
`
PCollection<TableRow> quotes =
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) window).start());
return "my-project:output.output_table_" + dayString;
}
}));
}
`
取自此处: