如何将流数据与 Dataflow/Beam 中的大型历史数据集相结合

How to combine streaming data with large history data set in Dataflow/Beam

我正在研究通过 Google Dataflow/Apache Beam 处理来自 Web 用户会话的日志,并且需要将用户进入(流式传输)时的日志与上次用户会话的历史记录相结合月.

我看过以下方法:

  1. 使用 30 天固定 window: 很可能大 window 适合内存,我不需要更新用户历史,只需参考它
  2. 使用 CoGroupByKey 连接两个数据集,但是这两个数据集必须具有相同的 window 大小 (https://cloud.google.com/dataflow/model/group-by-key#join),这在我的情况下是不正确的(24 小时对 30 天)
  3. 使用 Side Input 检索 processElement(ProcessContext processContext)
  4. 中给定 element 的用户会话历史记录

我的理解是通过.withSideInputs(pCollectionView)加载的数据需要适合内存。我知道我可以将单个用户的所有会话历史记录放入内存,但不能将 all 会话历史记录。

我的问题是,是否有办法从仅与当前用户会话相关的辅助输入中 load/stream 数据?

我正在设想一个 parDo 函数,它可以通过指定用户 ID 从侧面输入加载用户的历史会话。但是只有当前用户的历史会话才能放入内存;通过侧输入加载 所有 个历史会话会太大。

一些伪代码来说明:

public static class MetricFn extends DoFn<LogLine, String> {

    final PCollectionView<Map<String, Iterable<LogLine>>> pHistoryView;

    public MetricFn(PCollectionView<Map<String, Iterable<LogLine>>> historyView) {
        this.pHistoryView = historyView;
    }

    @Override
    public void processElement(ProcessContext processContext) throws Exception {
        Map<String, Iterable<LogLine>> historyLogData = processContext.sideInput(pHistoryView);

        final LogLine currentLogLine = processContext.element();
        final Iterable<LogLine> userHistory = historyLogData.get(currentLogLine.getUserId());
        final String outputMetric = calculateMetricWithUserHistory(currentLogLine, userHistory);
        processContext.output(outputMetric);
    }
}

目前没有一种方法可以在流式传输中访问每个键的侧输入,但它肯定会像您描述的那样有用,我们正在考虑实现它。

一种可能的解决方法是使用辅助输入来分发指向实际会话历史记录的指针。生成 24 小时会话历史记录的代码可以将它们上传到 GCS/BigQuery/etc,然后将这些位置作为辅助输入发送到加入代码。