数据流性能问题
Dataflow performance issues
我知道几周前对 CDF 服务进行了更新(更改了默认工作器类型和附加的 PD),并且明确表示这会使批处理作业变慢。然而,我们的工作绩效已经下降到无法实际满足我们业务需求的程度。
例如,特别是对于我们的一项工作:它从 BigQuery 中的 table 读取约 270 万行,有 6 个侧输入(BQ tables),做一些简单的字符串转换,最后将多个输出 (3) 写入 BigQuery。这过去需要 5-6 分钟,现在需要 15-20 分钟 - 无论我们使用多少虚拟机。
我们能做些什么来让速度恢复到我们以前看到的速度吗?
以下是一些统计数据:
- 从 BQ table 读取 2,744,897 行 (294MB)
- 6个BQ侧输入
- 3个多输出到BQ,其中2个是2,744,897,另外1,500行
- 运行 在区域 asia-east1-b
- 以下时间包括工作线程池启动和关闭
10 个虚拟机 (n1-standard-2)
16 分 5 秒
2015-04-22_19_42_20-4740106543213058308
10 个虚拟机 (n1-standard-4)
17 分 11 秒
2015-04-22_20_04_58-948224342106865432
10 个虚拟机 (n1-standard-1)
18 分 44 秒
2015-04-22_19_42_20-4740106543213058308
20 个虚拟机 (n1-standard-2)
22 分 53 秒
2015-04-22_21_26_53-18171886778433479315
50 个虚拟机 (n1-standard-2)
17 分 26 秒
2015-04-22_21_51_37-16026777746175810525
100 个虚拟机 (n1-standard-2)
19 分 33 秒
2015-04-22_22_32_13-9727928405932256127
证据似乎表明您的管道处理边输入的方式存在问题。具体来说,对于主输入的每个元素,副输入很可能会一次又一次地从 BigQuery 中重新读取。这与数据流工作者使用的虚拟机类型的变化完全正交,如下所述。
这与 Java 版本 0.3.150326 的 Dataflow SDK 中所做的更改密切相关。在该版本中,我们将侧输入 API 更改为按 window 应用。调用 sideInput()
现在 return 值仅在特定 window 对应于主输入元素的 window,而不是整个侧输入 PCollectionView
。因此,无法再从 DoFn
的 startBundle
和 finishBundle
中调用 sideInput()
,因为 window 尚不清楚。
例如,以下代码片段存在一个问题,会导致重新读取每个输入元素的侧输入。
@Override
public void processElement(ProcessContext c) throws Exception {
Iterable<String> uniqueIds = c.sideInput(iterableView);
for (String item : uniqueIds) {
[...]
}
c.output([...]);
}
可以通过在第一次调用 processElement
期间将侧输入缓存到转换的 List
成员变量(假设它适合内存)来改进此代码,并使用缓存的 List
而不是后续调用中的侧输入。
此解决方法应该可以恢复您之前看到的性能,当时可以从 startBundle
调用侧输入。从长远来看,我们将致力于更好地缓存侧输入。 (如果这不能帮助完全解决问题,请通过电子邮件联系我们并分享相关代码片段。)
另外,确实,云数据流服务在 2015 年 4 月 9 日左右进行了更新,更改了数据流工作者使用的默认虚拟机类型。具体来说,我们减少了每个工作人员的默认核心数,因为我们的基准测试表明它对于典型的工作具有成本效益。这 不是 任何类型的数据流服务的减速——它只是在默认情况下以每个工作人员较少的资源运行。用户仍然可以选择覆盖工作人员的数量以及工作人员使用的虚拟机类型。
我们追踪到了这个问题。这是当侧输入从 BigQuery table 读取数据时,它的数据已流式传输,而不是批量加载。当我们复制 table(s),并从副本中读取时,一切正常。
但是,这只是一种解决方法。 Dataflow 应该能够处理 BigQuery 中的流式传输 table 作为侧输入。
我知道几周前对 CDF 服务进行了更新(更改了默认工作器类型和附加的 PD),并且明确表示这会使批处理作业变慢。然而,我们的工作绩效已经下降到无法实际满足我们业务需求的程度。
例如,特别是对于我们的一项工作:它从 BigQuery 中的 table 读取约 270 万行,有 6 个侧输入(BQ tables),做一些简单的字符串转换,最后将多个输出 (3) 写入 BigQuery。这过去需要 5-6 分钟,现在需要 15-20 分钟 - 无论我们使用多少虚拟机。
我们能做些什么来让速度恢复到我们以前看到的速度吗?
以下是一些统计数据:
- 从 BQ table 读取 2,744,897 行 (294MB)
- 6个BQ侧输入
- 3个多输出到BQ,其中2个是2,744,897,另外1,500行
- 运行 在区域 asia-east1-b
- 以下时间包括工作线程池启动和关闭
10 个虚拟机 (n1-standard-2) 16 分 5 秒 2015-04-22_19_42_20-4740106543213058308
10 个虚拟机 (n1-standard-4) 17 分 11 秒 2015-04-22_20_04_58-948224342106865432
10 个虚拟机 (n1-standard-1) 18 分 44 秒 2015-04-22_19_42_20-4740106543213058308
20 个虚拟机 (n1-standard-2) 22 分 53 秒 2015-04-22_21_26_53-18171886778433479315
50 个虚拟机 (n1-standard-2) 17 分 26 秒 2015-04-22_21_51_37-16026777746175810525
100 个虚拟机 (n1-standard-2) 19 分 33 秒 2015-04-22_22_32_13-9727928405932256127
证据似乎表明您的管道处理边输入的方式存在问题。具体来说,对于主输入的每个元素,副输入很可能会一次又一次地从 BigQuery 中重新读取。这与数据流工作者使用的虚拟机类型的变化完全正交,如下所述。
这与 Java 版本 0.3.150326 的 Dataflow SDK 中所做的更改密切相关。在该版本中,我们将侧输入 API 更改为按 window 应用。调用 sideInput()
现在 return 值仅在特定 window 对应于主输入元素的 window,而不是整个侧输入 PCollectionView
。因此,无法再从 DoFn
的 startBundle
和 finishBundle
中调用 sideInput()
,因为 window 尚不清楚。
例如,以下代码片段存在一个问题,会导致重新读取每个输入元素的侧输入。
@Override
public void processElement(ProcessContext c) throws Exception {
Iterable<String> uniqueIds = c.sideInput(iterableView);
for (String item : uniqueIds) {
[...]
}
c.output([...]);
}
可以通过在第一次调用 processElement
期间将侧输入缓存到转换的 List
成员变量(假设它适合内存)来改进此代码,并使用缓存的 List
而不是后续调用中的侧输入。
此解决方法应该可以恢复您之前看到的性能,当时可以从 startBundle
调用侧输入。从长远来看,我们将致力于更好地缓存侧输入。 (如果这不能帮助完全解决问题,请通过电子邮件联系我们并分享相关代码片段。)
另外,确实,云数据流服务在 2015 年 4 月 9 日左右进行了更新,更改了数据流工作者使用的默认虚拟机类型。具体来说,我们减少了每个工作人员的默认核心数,因为我们的基准测试表明它对于典型的工作具有成本效益。这 不是 任何类型的数据流服务的减速——它只是在默认情况下以每个工作人员较少的资源运行。用户仍然可以选择覆盖工作人员的数量以及工作人员使用的虚拟机类型。
我们追踪到了这个问题。这是当侧输入从 BigQuery table 读取数据时,它的数据已流式传输,而不是批量加载。当我们复制 table(s),并从副本中读取时,一切正常。
但是,这只是一种解决方法。 Dataflow 应该能够处理 BigQuery 中的流式传输 table 作为侧输入。