是否可以根据 window 元素的时间戳动态生成 BigQuery table 名称?
Is it possible to dynamically generate BigQuery table names based on the timestamps of the elements of a window?
例如,如果我有一个从 PubSub 读取 5 分钟 window 的 Dataflow 流作业,我知道如果我将时间戳过去两天分配给一个元素,将会有一个 window 与此元素,如果我使用 BigQueryIO.java 中描述的每天 table 输出到 BigQuery 的示例,作业将在 BigQuery table 中写入过去两天的元素实际日期。
我想用 window 元素的时间戳而不是当前 window 的时间将过去的元素写入 BigQuery tables,这可能吗?
现在我正在按照 DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO 中描述的示例进行操作。java:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
((DaysWindow) window).getStartDate());
return "my-project:output.output_table_" + dayString;
}
}));
如果我理解正确,您希望确保 BigQuery table 是根据元素(引号)的固有时间戳创建的,而不是管道 [=45= 时的挂钟时间]s.
TL;DR 代码应该已经完成了您想要的操作;如果不是,请 post 更多详细信息。
更长的解释:
Dataflow 处理的一项关键创新是 事件时间处理 。这意味着 Dataflow 中的数据处理几乎与 处理 发生的时间完全分离 - 重要的是 正在处理的事件 发生的时间。这是在批处理或流数据源上启用与 运行 完全相同的代码的关键要素(例如,使用处理历史点击日志的相同代码处理实时用户点击事件)。它还可以灵活处理迟到的数据。
请参阅The world beyond batch, the section "Event time vs. processing time" for a description of this aspect of Dataflow's processing model (the whole article is very much worth a read). For a deeper description, see the VLDB paper. This is also described in a more user-facing way in the official documentation on windowing and triggers。
因此,不存在 "current window" 因为管道可能同时处理许多不同的事件,这些事件发生在不同的时间并且属于不同的 windows。事实上,正如 VLDB 论文所指出的,数据流管道执行的重要部分之一是 "group elements by window".
在您展示的管道中,我们将使用记录上的 provided timestamps 将您要写入 BigQuery 的记录分组到 windows,并将每个 window 写入其自己的 table,如有必要,为新遇到的 windows 创建 table。如果延迟数据到达 window(请参阅有关 windowing 和触发器的文档以讨论延迟数据),我们将追加到已经存在的 table.
上述代码不再适用于我。 Google 文档中有一个 updated example,尽管其中 DaysWindow 被 IntervalWindow 替换,这对我有用:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) window).start());
return "my-project:output.output_table_" + dayString;
}
}));
例如,如果我有一个从 PubSub 读取 5 分钟 window 的 Dataflow 流作业,我知道如果我将时间戳过去两天分配给一个元素,将会有一个 window 与此元素,如果我使用 BigQueryIO.java 中描述的每天 table 输出到 BigQuery 的示例,作业将在 BigQuery table 中写入过去两天的元素实际日期。
我想用 window 元素的时间戳而不是当前 window 的时间将过去的元素写入 BigQuery tables,这可能吗?
现在我正在按照 DataflowJavaSDK/sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO 中描述的示例进行操作。java:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>info(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd").parseDateTime(
((DaysWindow) window).getStartDate());
return "my-project:output.output_table_" + dayString;
}
}));
如果我理解正确,您希望确保 BigQuery table 是根据元素(引号)的固有时间戳创建的,而不是管道 [=45= 时的挂钟时间]s.
TL;DR 代码应该已经完成了您想要的操作;如果不是,请 post 更多详细信息。
更长的解释: Dataflow 处理的一项关键创新是 事件时间处理 。这意味着 Dataflow 中的数据处理几乎与 处理 发生的时间完全分离 - 重要的是 正在处理的事件 发生的时间。这是在批处理或流数据源上启用与 运行 完全相同的代码的关键要素(例如,使用处理历史点击日志的相同代码处理实时用户点击事件)。它还可以灵活处理迟到的数据。
请参阅The world beyond batch, the section "Event time vs. processing time" for a description of this aspect of Dataflow's processing model (the whole article is very much worth a read). For a deeper description, see the VLDB paper. This is also described in a more user-facing way in the official documentation on windowing and triggers。
因此,不存在 "current window" 因为管道可能同时处理许多不同的事件,这些事件发生在不同的时间并且属于不同的 windows。事实上,正如 VLDB 论文所指出的,数据流管道执行的重要部分之一是 "group elements by window".
在您展示的管道中,我们将使用记录上的 provided timestamps 将您要写入 BigQuery 的记录分组到 windows,并将每个 window 写入其自己的 table,如有必要,为新遇到的 windows 创建 table。如果延迟数据到达 window(请参阅有关 windowing 和触发器的文档以讨论延迟数据),我们将追加到已经存在的 table.
上述代码不再适用于我。 Google 文档中有一个 updated example,尽管其中 DaysWindow 被 IntervalWindow 替换,这对我有用:
PCollection<TableRow> quotes = ...
quotes.apply(Window.<TableRow>into(CalendarWindows.days(1)))
.apply(BigQueryIO.Write
.named("Write")
.withSchema(schema)
.to(new SerializableFunction<BoundedWindow, String>() {
public String apply(BoundedWindow window) {
// The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
.withZone(DateTimeZone.UTC)
.print(((IntervalWindow) window).start());
return "my-project:output.output_table_" + dayString;
}
}));