数据流作业具有高数据新鲜度,事件因迟到而被丢弃

Dataflow job has high data freshness and events are dropped due to lateness

我在 DEV 环境中将 apache beam 管道部署到 GCP 数据流,一切运行良好。然后我将它部署到欧洲环境中的生产环境(具体来说 - 工作区域:europe-west1,工作人员位置:europe-west1-d),在那里我们获得高数据速度并且事情开始变得复杂。

我正在使用会话 window 将事件分组到会话中。会话密钥是 tenantId/visitorId,间隔为 30 分钟。我还使用触发器每 30 秒发出一次事件,以便在会话结束前释放事件(将它们写入 BigQuery)。

问题似乎发生在 EventToSession/GroupPairsByKey。在此步骤中,droppedDueToLateness 计数器下有数千个事件,并且 dataFreshness 不断增加(自从我部署它后增加)。这个之前的所有步骤运行良好,之后的所有步骤都受它影响,但似乎没有任何其他问题。

我查看了一些指标,发现 EventToSession/GroupPairsByKey 步骤每秒处理 10 万个密钥到 20 万个密钥(取决于一天中的时间),这对我来说似乎很多。 cpu 利用率没有超过 70%,我正在使用流媒体引擎。大部分时间的工作人员数量为 2。最大工作人员内存容量为 32GB,而最大工作人员内存使用量目前为 23GB。我正在使用 e2-standard-8 机器类型。

我没有任何热键,因为每个会话最多包含几十个事件。

我最大的怀疑是在 EventToSession/GroupPairsByKey 步骤中处理的大量密钥。但另一方面,会话通常与单个客户相关,因此 google 应该期望每秒处理这么多的键,不是吗?

想获得有关如何解决 dataFreshness 和事件 droppedDueToLateness 问题的建议。

添加生成会话的代码片段:

input = input.apply("SetEventTimestamp", WithTimestamps.of(event -> Instant.parse(getEventTimestamp(event))
                            .withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
                    .apply("SetKeyForRow", WithKeys.of(event -> getSessionKey(event))).setCoder(KvCoder.of(StringUtf8Coder.of(), input.getCoder()))
                    .apply("CreatingWindow", Window.<KV<String, TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(30)))
                           .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
                           .discardingFiredPanes()
                           .withAllowedLateness(Duration.standardDays(30)))
                    .apply("GroupPairsByKey", GroupByKey.create())
                    .apply("CreateCollectionOfValuesOnly", Values.create())
                    .apply("FlattenTheValues", Flatten.iterables());

经过一些研究,我发现了以下内容:

  • 关于不断增加数据的新鲜度:只要允许迟到的数据到达会话 window,特定的 window 就会保留在内存中。这意味着允许延迟 30 天的数据将使每个会话在内存中至少保留 30 天,这显然会使系统过载。此外,我发现我们有一些 ever-lasting 会话由机器人访问我们正在监控的网站并采取行动。这些机器人可以永远保持会话,这也会使系统过载。解决方案是将允许的迟到时间减少到 2 天并使用 bounded sessions(寻找“有界会话”)。
  • 关于因迟到而丢弃的事件:这些事件在到达时属于过期的 window,例如 window 水印已经结束(请参阅文档 droppedDueToLateness here)。这些事件在会话 window 函数之后的第一个 GroupByKey 中被删除,以后无法处理。我们不想丢弃任何迟到的数据,所以解决方案是在每个事件进入会话部分之前检查它的时间戳,并仅将不会被丢弃的事件流式传输到会话部分 - 满足此条件的事件:event_timestamp >= event_arrival_time - (gap_duration + allowed_lateness)。其余部分将在没有会话数据的情况下写入 BigQuery(显然,如果事件的时间戳在 event_arrival_time - (gap_duration + allowed_lateness) 之前,即使存在该事件属于的实时会话,apache beam 也会丢弃一个事件...)

p.s - 在 bounded sessions 部分,他演示了如何实现有时间限制的会话,我相信他有一个错误,允许会话超出提供的最大大小。一旦会话超过最大大小,就可以发送与该会话相交且在会话之前的延迟数据,从而使会话的开始时间更早,从而扩展会话。此外,一旦会话超过最大大小,就不能添加属于它但不扩展它的事件。

为了解决这个问题,我切换了 current window 跨度和 if-statement 的顺序并编辑了 if-statement (检查会话最大大小的那个)在 window 跨越部分的 mergeWindows 函数中,因此会话不能超过最大大小,只能添加不超过最大大小的数据。这是我的实现:

public void mergeWindows(MergeContext c) throws Exception {
        List<IntervalWindow> sortedWindows = new ArrayList<>();
        for (IntervalWindow window : c.windows()) {
            sortedWindows.add(window);
        }
        Collections.sort(sortedWindows);
        List<MergeCandidate> merges = new ArrayList<>();
        MergeCandidate current = new MergeCandidate();

        for (IntervalWindow window : sortedWindows) {
            MergeCandidate next = new MergeCandidate(window);
            if (current.intersects(window)) {
                if ((current.union == null || new Duration(current.union.start(), window.end()).getMillis() <= maxSize.plus(gapDuration).getMillis())) {
                    current.add(window);
                    continue;
                }
            }
            merges.add(current);
            current = next;
        }
        merges.add(current);
        for (MergeCandidate merge : merges) {
            merge.apply(c);
        }
    }