具有 Google 数据流的长期状态
Long lived state with Google Dataflow
只是想了解这里的编程模型。场景是我正在使用 Pub/Sub + Dataflow 来检测网络论坛的分析。我有一个来自 Pub/Sub 的数据流,看起来像:
ID | TS | EventType
1 | 1 | Create
1 | 2 | Comment
2 | 2 | Create
1 | 4 | Comment
我希望最终得到一个来自 Dataflow 的流,如下所示:
ID | TS | num_comments
1 | 1 | 0
1 | 2 | 1
2 | 2 | 0
1 | 4 | 2
我希望将此汇总到 运行 的作业作为流处理,随着新事件的到来填充新的计数。我的问题是,作业存储当前主题 ID 和评论数的状态?假设主题可以存在多年。目前的想法是:
- 将主题 ID 的 'current' 条目写入 BigTable,并在 DoFn 查询中输入主题 ID 的当前评论数。即使在我写这篇文章时,我也不是粉丝。
- 以某种方式使用辅助输入?看起来也许这就是答案,但如果是这样我并不完全理解。
- 使用全局 window 设置流作业,每次获取记录时都会触发一个触发器,并依靠 Dataflow 将整个窗格历史记录保存在某个地方。 (无限存储要求?)
编辑:澄清一下,我不会在实施这三种策略中的任何一种时遇到任何问题,或者采用一百万种不同的其他方法,我更感兴趣的是什么是 最好的 使用 Dataflow 的方式。什么对失败最有弹性,必须重新处理历史记录以进行回填等。
EDIT2:目前数据流服务存在一个错误,如果将输入添加到展平转换,更新会失败,这意味着如果您对一项工作,包括向展平操作添加内容。
由于 BigQuery 不支持覆盖行,解决此问题的一种方法是将事件写入 BigQuery,然后使用 COUNT:
查询数据
SELECT ID,COUNT(num_comments) 来自 Table GROUP BY ID;
在将条目写入 BigQuery 之前,您还可以在 Dataflow 中对 num_comments 进行每window 次聚合;上面的查询将继续工作。
您应该能够使用触发器和组合来完成此操作。
PCollection<ID> comments = /* IDs from the source */;
PCollection<KV<ID, Long>> commentCounts = comments
// Produce speculative results by triggering as data comes in.
// Note that this won't trigger after *every* element, but it will
// trigger relatively quickly (as the system divides incoming data
// into work units). You could also throttle this with something
// like:
// AfterProcessingTime.pastFirstElementInPane()
// .plusDelayOf(Duration.standardMinutes(5))
// which will produce output every 5 minutes
.apply(Window.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
// Count the occurrences of each ID
.apply(Count.perElement());
// Produce an output String -- in your use case you'd want to produce
// a row and write it to the appropriate source
commentCounts.apply(new DoFn<KV<ID, Long>, String>() {
public void processElement(ProcessContext c) {
KV<ID, Long> element = c.element();
// This includes details about the pane of the window being
// processed, and including a strictly increasing index of the
// number of panes that have been produced for the key.
PaneInfo pane = c.pane();
return element.key() + " | " + pane.getIndex() + " | " + element.value();
}
});
根据您的数据,您还可以从来源读取整个评论,提取 ID,然后使用 Count.perKey()
获取每个 ID 的计数。如果你想要一个更复杂的组合,你可以看看定义一个自定义 CombineFn
并使用 Combine.perKey
.
只是想了解这里的编程模型。场景是我正在使用 Pub/Sub + Dataflow 来检测网络论坛的分析。我有一个来自 Pub/Sub 的数据流,看起来像:
ID | TS | EventType
1 | 1 | Create
1 | 2 | Comment
2 | 2 | Create
1 | 4 | Comment
我希望最终得到一个来自 Dataflow 的流,如下所示:
ID | TS | num_comments
1 | 1 | 0
1 | 2 | 1
2 | 2 | 0
1 | 4 | 2
我希望将此汇总到 运行 的作业作为流处理,随着新事件的到来填充新的计数。我的问题是,作业存储当前主题 ID 和评论数的状态?假设主题可以存在多年。目前的想法是:
- 将主题 ID 的 'current' 条目写入 BigTable,并在 DoFn 查询中输入主题 ID 的当前评论数。即使在我写这篇文章时,我也不是粉丝。
- 以某种方式使用辅助输入?看起来也许这就是答案,但如果是这样我并不完全理解。
- 使用全局 window 设置流作业,每次获取记录时都会触发一个触发器,并依靠 Dataflow 将整个窗格历史记录保存在某个地方。 (无限存储要求?)
编辑:澄清一下,我不会在实施这三种策略中的任何一种时遇到任何问题,或者采用一百万种不同的其他方法,我更感兴趣的是什么是 最好的 使用 Dataflow 的方式。什么对失败最有弹性,必须重新处理历史记录以进行回填等。
EDIT2:目前数据流服务存在一个错误,如果将输入添加到展平转换,更新会失败,这意味着如果您对一项工作,包括向展平操作添加内容。
由于 BigQuery 不支持覆盖行,解决此问题的一种方法是将事件写入 BigQuery,然后使用 COUNT:
查询数据SELECT ID,COUNT(num_comments) 来自 Table GROUP BY ID;
在将条目写入 BigQuery 之前,您还可以在 Dataflow 中对 num_comments 进行每window 次聚合;上面的查询将继续工作。
您应该能够使用触发器和组合来完成此操作。
PCollection<ID> comments = /* IDs from the source */;
PCollection<KV<ID, Long>> commentCounts = comments
// Produce speculative results by triggering as data comes in.
// Note that this won't trigger after *every* element, but it will
// trigger relatively quickly (as the system divides incoming data
// into work units). You could also throttle this with something
// like:
// AfterProcessingTime.pastFirstElementInPane()
// .plusDelayOf(Duration.standardMinutes(5))
// which will produce output every 5 minutes
.apply(Window.triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes())
// Count the occurrences of each ID
.apply(Count.perElement());
// Produce an output String -- in your use case you'd want to produce
// a row and write it to the appropriate source
commentCounts.apply(new DoFn<KV<ID, Long>, String>() {
public void processElement(ProcessContext c) {
KV<ID, Long> element = c.element();
// This includes details about the pane of the window being
// processed, and including a strictly increasing index of the
// number of panes that have been produced for the key.
PaneInfo pane = c.pane();
return element.key() + " | " + pane.getIndex() + " | " + element.value();
}
});
根据您的数据,您还可以从来源读取整个评论,提取 ID,然后使用 Count.perKey()
获取每个 ID 的计数。如果你想要一个更复杂的组合,你可以看看定义一个自定义 CombineFn
并使用 Combine.perKey
.