GCP 数据流 droppedDueToClosedWindow & Commit 请求阶段 S8 和密钥 8 大于 2GB
GCP Dataflow droppedDueToClosedWindow & Commit request for stage S8 and key 8 is larger than 2GB
我们 运行 遇到 Google 云上的数据流问题。我们的管道由各种输入步骤组成,这些步骤通过 GCP PubSub 获取推送的数据。然后我们汇总数据并对其进行排序。这些 1 步骤对于 Dataflow 和我们配置的 window 来说显然太重了。我们在步骤中遇到异常 [2]。我们还看到了这些指标:
droppedDueToClosedWindow 3,838,662 Bids/AggregateExchangeOrders
droppedDueToClosedWindow 21,060,627 Asks/AggregateExchangeOrders
现在我正在寻求如何解决这个问题的建议。我是否应该分解这些步骤,例如迭代和排序可以通过并行步骤完成?
有没有办法获得更多关于到底发生了什么的信息?
我们应该增加工人的数量吗? (目前 1)。
我们对 Dataflow 比较陌生。 .. 非常欢迎好的建议。
编辑:我正在添加一些有关步骤的详细信息。
下面的步骤是这样 'chained' 的:
@Override
public PCollection<KV<KV<String, String>, List<ExchangeOrder>>> expand(PCollection<KV<String, KV<String, String>>> input) {
return input.apply("PairWithType", new ByPairWithType(type))
.apply("UnfoldExchangeOrders", new ByAggregatedExchangeOrders())
.apply("AggregateExchangeOrders", GroupByKey.<KV<String, String>, KV<String, KV<BigDecimal, BigDecimal>>>create())
.apply("ReorderExchangeOrders", ParDo.of(new ReorderExchangeOrders()));
}
聚合交换订单:
所以在这里,显然我们遍历了订单集合,并解析了类型(两次),所以它是一个大的小数点。
这让我想到,我们可以跳过一个解析步骤,如下所述:
Convert string to BigDecimal in java
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, KV<String, String>> key = c.element().getKey();
List<KV<String, String>> value = c.element().getValue();
value.forEach(
exchangeOrder -> {
try {
BigDecimal unitPrice = BigDecimal.valueOf(Double.valueOf(exchangeOrder.getKey()));
BigDecimal quantity = BigDecimal.valueOf(Double.valueOf(exchangeOrder.getValue()));
if (quantity.compareTo(BigDecimal.ZERO) != 0) {
// Exclude exchange orders with no quantity.
c.output(KV.of(key.getValue(), KV.of(key.getKey(), KV.of(unitPrice, quantity))));
}
} catch (NumberFormatException e) {
// Exclude exchange orders with invalid element.
}
});
}
...接下来我们进行分组和排序。 (并可选择反转它),这一步似乎并没有带来巨大的负担。
重新订购交换订单:
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, String> pairAndType = c.element().getKey();
Iterable<KV<String, KV<BigDecimal, BigDecimal>>> exchangeOrderBook = c.element().getValue();
List<ExchangeOrder> list = new ArrayList<>();
exchangeOrderBook.forEach(exchangeOrder -> list.add(
new ExchangeOrder(exchangeOrder.getKey(), exchangeOrder.getValue().getKey(), exchangeOrder.getValue().getValue())));
// Asks are sorted in ASC order
Collections.sort(list);
// Bids are sorted in DSC order
if (pairAndType.getValue().equals(EXCHANGE_ORDER_TYPE.BIDS.toString())) {
Collections.reverse(list);
}
c.output(KV.of(pairAndType, list));
}
[ 1 ] 数据流截图:
[ 2 ]异常:阶段S8提交请求,键8大于2GB,无法处理。
java.lang.IllegalStateException: Commit request for stage S8 and key 8 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
com.google.cloud.dataflow.worker.StreamingDataflowWorker$Commit.getSize(StreamingDataflowWorker.java:327)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.lambda$new[=14=](StreamingDataflowWorker.java:342)
错误信息比较直白。
正如许多评论指出的那样,问题的根本原因是包含一个 DoFn 的所有结果的结构大于 2GB,您最好的选择是以某种方式对数据进行分区以使您的工作单位更小。
在代码中,我看到 DoFn 返回的一些结构是 KV> 形式的嵌套结构。这种安排会强制 Dataflow 将整个响应发送回一个单一的包中,并防止它把它分成更小的部分。
一种可能的解决方案是在管道中尽可能长时间地使用复合键而不是嵌套结构,并且仅在绝对必要时才组合它们。
例如,
instead of KV>, the DoFn could return
KV<(concat(Key1, Key2)), Value>
这会将工作单元拆分成更小的集合,然后可以将这些集合并行分派给多个工作人员。
为了回答其他问题,增加 worker 的数量不会有任何效果,因为 DoFn 生成的庞大集合看起来是不可拆分的。添加日志记录以查看集合如何达到 2GB 可能会提供有用的提示来防止这种情况发生。
我们 运行 遇到 Google 云上的数据流问题。我们的管道由各种输入步骤组成,这些步骤通过 GCP PubSub 获取推送的数据。然后我们汇总数据并对其进行排序。这些 1 步骤对于 Dataflow 和我们配置的 window 来说显然太重了。我们在步骤中遇到异常 [2]。我们还看到了这些指标:
droppedDueToClosedWindow 3,838,662 Bids/AggregateExchangeOrders
droppedDueToClosedWindow 21,060,627 Asks/AggregateExchangeOrders
现在我正在寻求如何解决这个问题的建议。我是否应该分解这些步骤,例如迭代和排序可以通过并行步骤完成?
有没有办法获得更多关于到底发生了什么的信息? 我们应该增加工人的数量吗? (目前 1)。
我们对 Dataflow 比较陌生。 .. 非常欢迎好的建议。
编辑:我正在添加一些有关步骤的详细信息。
下面的步骤是这样 'chained' 的:
@Override
public PCollection<KV<KV<String, String>, List<ExchangeOrder>>> expand(PCollection<KV<String, KV<String, String>>> input) {
return input.apply("PairWithType", new ByPairWithType(type))
.apply("UnfoldExchangeOrders", new ByAggregatedExchangeOrders())
.apply("AggregateExchangeOrders", GroupByKey.<KV<String, String>, KV<String, KV<BigDecimal, BigDecimal>>>create())
.apply("ReorderExchangeOrders", ParDo.of(new ReorderExchangeOrders()));
}
聚合交换订单:
所以在这里,显然我们遍历了订单集合,并解析了类型(两次),所以它是一个大的小数点。 这让我想到,我们可以跳过一个解析步骤,如下所述: Convert string to BigDecimal in java
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, KV<String, String>> key = c.element().getKey();
List<KV<String, String>> value = c.element().getValue();
value.forEach(
exchangeOrder -> {
try {
BigDecimal unitPrice = BigDecimal.valueOf(Double.valueOf(exchangeOrder.getKey()));
BigDecimal quantity = BigDecimal.valueOf(Double.valueOf(exchangeOrder.getValue()));
if (quantity.compareTo(BigDecimal.ZERO) != 0) {
// Exclude exchange orders with no quantity.
c.output(KV.of(key.getValue(), KV.of(key.getKey(), KV.of(unitPrice, quantity))));
}
} catch (NumberFormatException e) {
// Exclude exchange orders with invalid element.
}
});
}
...接下来我们进行分组和排序。 (并可选择反转它),这一步似乎并没有带来巨大的负担。
重新订购交换订单:
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, String> pairAndType = c.element().getKey();
Iterable<KV<String, KV<BigDecimal, BigDecimal>>> exchangeOrderBook = c.element().getValue();
List<ExchangeOrder> list = new ArrayList<>();
exchangeOrderBook.forEach(exchangeOrder -> list.add(
new ExchangeOrder(exchangeOrder.getKey(), exchangeOrder.getValue().getKey(), exchangeOrder.getValue().getValue())));
// Asks are sorted in ASC order
Collections.sort(list);
// Bids are sorted in DSC order
if (pairAndType.getValue().equals(EXCHANGE_ORDER_TYPE.BIDS.toString())) {
Collections.reverse(list);
}
c.output(KV.of(pairAndType, list));
}
[ 1 ] 数据流截图:
[ 2 ]异常:阶段S8提交请求,键8大于2GB,无法处理。
java.lang.IllegalStateException: Commit request for stage S8 and key 8 is larger than 2GB and cannot be processed. This may be caused by grouping a very large amount of data in a single window without using Combine, or by producing a large amount of data from a single input element.
com.google.cloud.dataflow.worker.StreamingDataflowWorker$Commit.getSize(StreamingDataflowWorker.java:327)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.lambda$new[=14=](StreamingDataflowWorker.java:342)
错误信息比较直白。 正如许多评论指出的那样,问题的根本原因是包含一个 DoFn 的所有结果的结构大于 2GB,您最好的选择是以某种方式对数据进行分区以使您的工作单位更小。
在代码中,我看到 DoFn 返回的一些结构是 KV> 形式的嵌套结构。这种安排会强制 Dataflow 将整个响应发送回一个单一的包中,并防止它把它分成更小的部分。
一种可能的解决方案是在管道中尽可能长时间地使用复合键而不是嵌套结构,并且仅在绝对必要时才组合它们。
例如,
instead of KV>, the DoFn could return
KV<(concat(Key1, Key2)), Value>
这会将工作单元拆分成更小的集合,然后可以将这些集合并行分派给多个工作人员。
为了回答其他问题,增加 worker 的数量不会有任何效果,因为 DoFn 生成的庞大集合看起来是不可拆分的。添加日志记录以查看集合如何达到 2GB 可能会提供有用的提示来防止这种情况发生。