数据流作业具有高数据新鲜度,事件因迟到而被丢弃
Dataflow job has high data freshness and events are dropped due to lateness
我在 DEV 环境中将 apache beam 管道部署到 GCP 数据流,一切运行良好。然后我将它部署到欧洲环境中的生产环境(具体来说 - 工作区域:europe-west1
,工作人员位置:europe-west1-d
),在那里我们获得高数据速度并且事情开始变得复杂。
我正在使用会话 window 将事件分组到会话中。会话密钥是 tenantId/visitorId
,间隔为 30 分钟。我还使用触发器每 30 秒发出一次事件,以便在会话结束前释放事件(将它们写入 BigQuery)。
问题似乎发生在 EventToSession/GroupPairsByKey
。在此步骤中,droppedDueToLateness
计数器下有数千个事件,并且 dataFreshness
不断增加(自从我部署它后增加)。这个之前的所有步骤运行良好,之后的所有步骤都受它影响,但似乎没有任何其他问题。
我查看了一些指标,发现 EventToSession/GroupPairsByKey
步骤每秒处理 10 万个密钥到 20 万个密钥(取决于一天中的时间),这对我来说似乎很多。 cpu 利用率没有超过 70%,我正在使用流媒体引擎。大部分时间的工作人员数量为 2。最大工作人员内存容量为 32GB,而最大工作人员内存使用量目前为 23GB。我正在使用 e2-standard-8
机器类型。
我没有任何热键,因为每个会话最多包含几十个事件。
我最大的怀疑是在 EventToSession/GroupPairsByKey
步骤中处理的大量密钥。但另一方面,会话通常与单个客户相关,因此 google 应该期望每秒处理这么多的键,不是吗?
想获得有关如何解决 dataFreshness
和事件 droppedDueToLateness
问题的建议。
添加生成会话的代码片段:
input = input.apply("SetEventTimestamp", WithTimestamps.of(event -> Instant.parse(getEventTimestamp(event))
.withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
.apply("SetKeyForRow", WithKeys.of(event -> getSessionKey(event))).setCoder(KvCoder.of(StringUtf8Coder.of(), input.getCoder()))
.apply("CreatingWindow", Window.<KV<String, TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(30)))
.apply("GroupPairsByKey", GroupByKey.create())
.apply("CreateCollectionOfValuesOnly", Values.create())
.apply("FlattenTheValues", Flatten.iterables());
经过一些研究,我发现了以下内容:
- 关于不断增加数据的新鲜度:只要允许迟到的数据到达会话 window,特定的 window 就会保留在内存中。这意味着允许延迟 30 天的数据将使每个会话在内存中至少保留 30 天,这显然会使系统过载。此外,我发现我们有一些 ever-lasting 会话由机器人访问我们正在监控的网站并采取行动。这些机器人可以永远保持会话,这也会使系统过载。解决方案是将允许的迟到时间减少到 2 天并使用 bounded sessions(寻找“有界会话”)。
- 关于因迟到而丢弃的事件:这些事件在到达时属于过期的 window,例如 window 水印已经结束(请参阅文档
droppedDueToLateness
here)。这些事件在会话 window 函数之后的第一个 GroupByKey
中被删除,以后无法处理。我们不想丢弃任何迟到的数据,所以解决方案是在每个事件进入会话部分之前检查它的时间戳,并仅将不会被丢弃的事件流式传输到会话部分 - 满足此条件的事件:event_timestamp >= event_arrival_time - (gap_duration + allowed_lateness)
。其余部分将在没有会话数据的情况下写入 BigQuery(显然,如果事件的时间戳在 event_arrival_time - (gap_duration + allowed_lateness)
之前,即使存在该事件属于的实时会话,apache beam 也会丢弃一个事件...)
p.s - 在 bounded sessions 部分,他演示了如何实现有时间限制的会话,我相信他有一个错误,允许会话超出提供的最大大小。一旦会话超过最大大小,就可以发送与该会话相交且在会话之前的延迟数据,从而使会话的开始时间更早,从而扩展会话。此外,一旦会话超过最大大小,就不能添加属于它但不扩展它的事件。
为了解决这个问题,我切换了 current
window 跨度和 if-statement 的顺序并编辑了 if-statement (检查会话最大大小的那个)在 window 跨越部分的 mergeWindows
函数中,因此会话不能超过最大大小,只能添加不超过最大大小的数据。这是我的实现:
public void mergeWindows(MergeContext c) throws Exception {
List<IntervalWindow> sortedWindows = new ArrayList<>();
for (IntervalWindow window : c.windows()) {
sortedWindows.add(window);
}
Collections.sort(sortedWindows);
List<MergeCandidate> merges = new ArrayList<>();
MergeCandidate current = new MergeCandidate();
for (IntervalWindow window : sortedWindows) {
MergeCandidate next = new MergeCandidate(window);
if (current.intersects(window)) {
if ((current.union == null || new Duration(current.union.start(), window.end()).getMillis() <= maxSize.plus(gapDuration).getMillis())) {
current.add(window);
continue;
}
}
merges.add(current);
current = next;
}
merges.add(current);
for (MergeCandidate merge : merges) {
merge.apply(c);
}
}
我在 DEV 环境中将 apache beam 管道部署到 GCP 数据流,一切运行良好。然后我将它部署到欧洲环境中的生产环境(具体来说 - 工作区域:europe-west1
,工作人员位置:europe-west1-d
),在那里我们获得高数据速度并且事情开始变得复杂。
我正在使用会话 window 将事件分组到会话中。会话密钥是 tenantId/visitorId
,间隔为 30 分钟。我还使用触发器每 30 秒发出一次事件,以便在会话结束前释放事件(将它们写入 BigQuery)。
问题似乎发生在 EventToSession/GroupPairsByKey
。在此步骤中,droppedDueToLateness
计数器下有数千个事件,并且 dataFreshness
不断增加(自从我部署它后增加)。这个之前的所有步骤运行良好,之后的所有步骤都受它影响,但似乎没有任何其他问题。
我查看了一些指标,发现 EventToSession/GroupPairsByKey
步骤每秒处理 10 万个密钥到 20 万个密钥(取决于一天中的时间),这对我来说似乎很多。 cpu 利用率没有超过 70%,我正在使用流媒体引擎。大部分时间的工作人员数量为 2。最大工作人员内存容量为 32GB,而最大工作人员内存使用量目前为 23GB。我正在使用 e2-standard-8
机器类型。
我没有任何热键,因为每个会话最多包含几十个事件。
我最大的怀疑是在 EventToSession/GroupPairsByKey
步骤中处理的大量密钥。但另一方面,会话通常与单个客户相关,因此 google 应该期望每秒处理这么多的键,不是吗?
想获得有关如何解决 dataFreshness
和事件 droppedDueToLateness
问题的建议。
添加生成会话的代码片段:
input = input.apply("SetEventTimestamp", WithTimestamps.of(event -> Instant.parse(getEventTimestamp(event))
.withAllowedTimestampSkew(new Duration(Long.MAX_VALUE)))
.apply("SetKeyForRow", WithKeys.of(event -> getSessionKey(event))).setCoder(KvCoder.of(StringUtf8Coder.of(), input.getCoder()))
.apply("CreatingWindow", Window.<KV<String, TableRow>>into(Sessions.withGapDuration(Duration.standardMinutes(30)))
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(30))))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardDays(30)))
.apply("GroupPairsByKey", GroupByKey.create())
.apply("CreateCollectionOfValuesOnly", Values.create())
.apply("FlattenTheValues", Flatten.iterables());
经过一些研究,我发现了以下内容:
- 关于不断增加数据的新鲜度:只要允许迟到的数据到达会话 window,特定的 window 就会保留在内存中。这意味着允许延迟 30 天的数据将使每个会话在内存中至少保留 30 天,这显然会使系统过载。此外,我发现我们有一些 ever-lasting 会话由机器人访问我们正在监控的网站并采取行动。这些机器人可以永远保持会话,这也会使系统过载。解决方案是将允许的迟到时间减少到 2 天并使用 bounded sessions(寻找“有界会话”)。
- 关于因迟到而丢弃的事件:这些事件在到达时属于过期的 window,例如 window 水印已经结束(请参阅文档
droppedDueToLateness
here)。这些事件在会话 window 函数之后的第一个GroupByKey
中被删除,以后无法处理。我们不想丢弃任何迟到的数据,所以解决方案是在每个事件进入会话部分之前检查它的时间戳,并仅将不会被丢弃的事件流式传输到会话部分 - 满足此条件的事件:event_timestamp >= event_arrival_time - (gap_duration + allowed_lateness)
。其余部分将在没有会话数据的情况下写入 BigQuery(显然,如果事件的时间戳在event_arrival_time - (gap_duration + allowed_lateness)
之前,即使存在该事件属于的实时会话,apache beam 也会丢弃一个事件...)
p.s - 在 bounded sessions 部分,他演示了如何实现有时间限制的会话,我相信他有一个错误,允许会话超出提供的最大大小。一旦会话超过最大大小,就可以发送与该会话相交且在会话之前的延迟数据,从而使会话的开始时间更早,从而扩展会话。此外,一旦会话超过最大大小,就不能添加属于它但不扩展它的事件。
为了解决这个问题,我切换了 current
window 跨度和 if-statement 的顺序并编辑了 if-statement (检查会话最大大小的那个)在 window 跨越部分的 mergeWindows
函数中,因此会话不能超过最大大小,只能添加不超过最大大小的数据。这是我的实现:
public void mergeWindows(MergeContext c) throws Exception {
List<IntervalWindow> sortedWindows = new ArrayList<>();
for (IntervalWindow window : c.windows()) {
sortedWindows.add(window);
}
Collections.sort(sortedWindows);
List<MergeCandidate> merges = new ArrayList<>();
MergeCandidate current = new MergeCandidate();
for (IntervalWindow window : sortedWindows) {
MergeCandidate next = new MergeCandidate(window);
if (current.intersects(window)) {
if ((current.union == null || new Duration(current.union.start(), window.end()).getMillis() <= maxSize.plus(gapDuration).getMillis())) {
current.add(window);
continue;
}
}
merges.add(current);
current = next;
}
merges.add(current);
for (MergeCandidate merge : merges) {
merge.apply(c);
}
}