具有 Google 数据流的长期状态

Long lived state with Google Dataflow

只是想了解这里的编程模型。场景是我正在使用 Pub/Sub + Dataflow 来检测网络论坛的分析。我有一个来自 Pub/Sub 的数据流,看起来像:

ID | TS | EventType
1  | 1  | Create
1  | 2  | Comment
2  | 2  | Create
1  | 4  | Comment

我希望最终得到一个来自 Dataflow 的流,如下所示:

ID | TS | num_comments
1  | 1  | 0
1  | 2  | 1
2  | 2  | 0
1  | 4  | 2

我希望将此汇总到 运行 的作业作为流处理,随着新事件的到来填充新的计数。我的问题是,作业存储当前主题 ID 和评论数的状态?假设主题可以存在多年。目前的想法是:

编辑:澄清一下,我不会在实施这三种策略中的任何一种时遇到任何问题,或者采用一百万种不同的其他方法,我更感兴趣的是什么是 最好的 使用 Dataflow 的方式。什么对失败最有弹性,必须重新处理历史记录以进行回填等。

EDIT2:目前数据流服务存在一个错误,如果将输入添加到展平转换,更新会失败,这意味着如果您对一项工作,包括向展平操作添加内容。

由于 BigQuery 不支持覆盖行,解决此问题的一种方法是将事件写入 BigQuery,然后使用 COUNT:

查询数据

SELECT ID,COUNT(num_comments) 来自 Table GROUP BY ID;

在将条目写入 BigQuery 之前,您还可以在 Dataflow 中对 num_comments 进行每window 次聚合;上面的查询将继续工作。

您应该能够使用触发器和组合来完成此操作。

PCollection<ID> comments = /* IDs from the source */;
PCollection<KV<ID, Long>> commentCounts = comments
    // Produce speculative results by triggering as data comes in.
    // Note that this won't trigger after *every* element, but it will
    // trigger relatively quickly (as the system divides incoming data
    // into work units). You could also throttle this with something
    // like:
    //   AfterProcessingTime.pastFirstElementInPane()
    //     .plusDelayOf(Duration.standardMinutes(5))
    // which will produce output every 5 minutes
    .apply(Window.triggering(
            Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .accumulatingFiredPanes())
    // Count the occurrences of each ID
    .apply(Count.perElement());

// Produce an output String -- in your use case you'd want to produce
// a row and write it to the appropriate source  
commentCounts.apply(new DoFn<KV<ID, Long>, String>() {
  public void processElement(ProcessContext c) {
    KV<ID, Long> element = c.element();
    // This includes details about the pane of the window being
    // processed, and including a strictly increasing index of the
    // number of panes that have been produced for the key.        
    PaneInfo pane = c.pane();
    return element.key() + " | " + pane.getIndex() + " | " + element.value(); 
  }
});

根据您的数据,您还可以从来源读取整个评论,提取 ID,然后使用 Count.perKey() 获取每个 ID 的计数。如果你想要一个更复杂的组合,你可以看看定义一个自定义 CombineFn 并使用 Combine.perKey.