Spark 是否支持对 S3 中的镶木地板文件进行真正的列扫描?
Does Spark support true column scans over parquet files in S3?
Parquet 数据存储格式的一大好处是 。如果我有一个包含数百列的 'wide' 数据集,但我的查询只涉及其中的几列,那么可以只读取存储这几列的数据,而跳过其余列。
据推测,此功能是通过读取 parquet 文件头部的一些元数据来工作的,这些元数据指示每一列在文件系统中的位置。 reader 然后可以在磁盘上查找以仅读入必要的列。
有谁知道 spark 的默认 parquet reader 是否正确地在 S3 上实现了这种选择性搜索?我认为 ,但理论支持与正确利用该支持的实现之间存在很大差异。
spark 的 parquet reader 与任何其他 InputFormat 一样,
inputFormat 的 None 对 S3 有任何特殊之处。输入格式可以从 LocalFileSystem 、 Hdfs 和 S3 读取,没有为此做特殊优化。
Parquet InpuTFormat 会根据您询问的栏目select积极地为您阅读这些栏目。
如果您想确定(尽管下推谓词在最新的 spark 版本中有效)手动 select 列并编写转换和操作,而不是依赖于 SQL
不,不完全支持谓词下推。当然,这取决于:
- 具体用例
- Spark 版本
- S3 连接器类型和版本
为了检查您的具体用例,您可以在 Spark 中启用 DEBUG 日志级别,并且 运行 您的查询。然后,您可以看到在 S3 (HTTP) 请求期间是否有 "seeks" 以及实际发送了多少请求。像这样:
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]"
....
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]"
....
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"
这是最近由于 Spark 2.1 无法根据 Parquet 文件中存储的元数据计算数据集中所有行的 COUNT(*)
而打开的问题报告示例:https://issues.apache.org/jira/browse/SPARK-21074
免责声明:我没有明确的答案,也不想充当权威来源,但我花了一些时间研究 Spark 2.2+ 中的 parquet 支持,希望我的回答能对我们所有人有所帮助更接近正确答案。
Does Parquet on S3 avoid pulling the data for unused columns from S3 and only retrieve the file chunks it needs, or does it pull the whole file?
我使用我今天从 master.
构建的 Spark 2.3.0-SNAPSHOT
parquet
数据源格式由ParquetFileFormat which is a FileFormat处理。
如果我是对的,阅读部分由buildReaderWithPartitionValues方法处理(覆盖FileFormat
)。
buildReaderWithPartitionValues
专门用于当 FileSourceScanExec
物理运算符被请求用于所谓的输入 RDDs 时,实际上是单个 RDD 以在执行 WholeStageCodegenExec
时生成内部行。
话虽如此,我认为回顾 buildReaderWithPartitionValues
所做的事情可能会让我们更接近最终答案。
当您查看 the line 时,您可以确信我们走在正确的轨道上。
// Try to push down filters when filter push-down is enabled.
该代码路径取决于 spark.sql.parquet.filterPushdown
Spark 属性 is turned on by default。
spark.sql.parquet.filterPushdown Enables Parquet filter push-down optimization when set to true.
这将我们引向 parquet-hadoop 的 ParquetInputFormat.setFilterPredicate iff 过滤器已定义。
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
当代码回退到 parquet-mr(而不是使用所谓的矢量化 parquet 解码 reader)时使用过滤器时,代码稍后会变得更有趣。这是我不太理解的部分(除了我在代码中看到的)。
请注意,矢量化 parquet 解码 reader 由 spark.sql.parquet.enableVectorizedReader
Spark 属性 控制,默认打开。
提示:要了解使用了 if
表达式的哪一部分,请为 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
记录器启用 DEBUG
日志记录级别。
为了查看所有下推过滤器,您可以打开 org.apache.spark.sql.execution.FileSourceScanExec
记录器的 INFO
记录级别。你应该 see the following in the logs:
INFO Pushed Filters: [pushedDownFilters]
我确实希望,如果它不是一个明确的答案,它会有所帮助,并且有人会在我停下来的地方把它捡起来,尽快成为一个。 希望最后破灭 :)
这需要分解
- Parquet 代码是否从 spark 获取谓词(是)
- 然后 parquet 会尝试使用 Hadoop
FileSystem
seek()
+ read()
或 readFully(position, buffer, length)
调用有选择地只读取那些列吗?是
- S3 连接器是否将这些文件操作转换为高效的 HTTP GET 请求?在 Amazon EMR 中:是的。在 Apache Hadoop 中,您需要在类路径上安装 hadoop 2.8 并正确设置
spark.hadoop.fs.s3a.experimental.fadvise=random
以触发随机访问。
Hadoop 2.7 和更早版本处理主动搜索()绕文件很糟糕,因为它们总是启动 GET 偏移量文件结束,对下一个搜索感到惊讶,必须中止该连接,重新打开一个新的TCP/HTTPS 1.1连接(慢,CPU重),再做一遍,反复做。随机 IO 操作会伤害诸如 .csv.gz 之类的批量加载,但对于获得 ORC/Parquet 性能至关重要。
您无法在 Hadoop 2.7 的 hadoop-aws JAR 上获得加速。如果需要,您需要更新 hadoop*.jar 和依赖项,或者针对 Hadoop 2.8
从头开始构建 Spark
请注意,Hadoop 2.8+ 还有一个不错的小功能:如果您在日志语句中的 S3A 文件系统客户端上调用 toString()
,它会打印出所有文件系统 IO 统计信息,包括丢弃了多少数据在寻找中,中止 TCP 连接 &c。帮助您弄清楚发生了什么。
2018-04-13 警告::不要尝试将 Hadoop 2.8+ hadoop-aws
JAR 与 hadoop-2.7 的其余部分一起放在类路径中JAR 设置并期望看到任何加速。您将看到的只是堆栈跟踪。您需要更新所有 hadoop JAR 及其传递依赖项。
Parquet 数据存储格式的一大好处是
据推测,此功能是通过读取 parquet 文件头部的一些元数据来工作的,这些元数据指示每一列在文件系统中的位置。 reader 然后可以在磁盘上查找以仅读入必要的列。
有谁知道 spark 的默认 parquet reader 是否正确地在 S3 上实现了这种选择性搜索?我认为
spark 的 parquet reader 与任何其他 InputFormat 一样,
-
inputFormat 的
None 对 S3 有任何特殊之处。输入格式可以从 LocalFileSystem 、 Hdfs 和 S3 读取,没有为此做特殊优化。
Parquet InpuTFormat 会根据您询问的栏目select积极地为您阅读这些栏目。
如果您想确定(尽管下推谓词在最新的 spark 版本中有效)手动 select 列并编写转换和操作,而不是依赖于 SQL
不,不完全支持谓词下推。当然,这取决于:
- 具体用例
- Spark 版本
- S3 连接器类型和版本
为了检查您的具体用例,您可以在 Spark 中启用 DEBUG 日志级别,并且 运行 您的查询。然后,您可以看到在 S3 (HTTP) 请求期间是否有 "seeks" 以及实际发送了多少请求。像这样:
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 >> "GET /test/part-00000-b8a8a1b7-0581-401f-b520-27fa9600f35e.snappy.parquet HTTP/1.1[\r][\n]"
....
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Range: bytes 0-7472093/7472094[\r][\n]"
....
17/06/13 05:46:50 DEBUG wire: http-outgoing-1 << "Content-Length: 7472094[\r][\n]"
这是最近由于 Spark 2.1 无法根据 Parquet 文件中存储的元数据计算数据集中所有行的 COUNT(*)
而打开的问题报告示例:https://issues.apache.org/jira/browse/SPARK-21074
免责声明:我没有明确的答案,也不想充当权威来源,但我花了一些时间研究 Spark 2.2+ 中的 parquet 支持,希望我的回答能对我们所有人有所帮助更接近正确答案。
Does Parquet on S3 avoid pulling the data for unused columns from S3 and only retrieve the file chunks it needs, or does it pull the whole file?
我使用我今天从 master.
构建的 Spark 2.3.0-SNAPSHOTparquet
数据源格式由ParquetFileFormat which is a FileFormat处理。
如果我是对的,阅读部分由buildReaderWithPartitionValues方法处理(覆盖FileFormat
)。
buildReaderWithPartitionValues
专门用于当 FileSourceScanExec
物理运算符被请求用于所谓的输入 RDDs 时,实际上是单个 RDD 以在执行 WholeStageCodegenExec
时生成内部行。
话虽如此,我认为回顾 buildReaderWithPartitionValues
所做的事情可能会让我们更接近最终答案。
当您查看 the line 时,您可以确信我们走在正确的轨道上。
// Try to push down filters when filter push-down is enabled.
该代码路径取决于 spark.sql.parquet.filterPushdown
Spark 属性 is turned on by default。
spark.sql.parquet.filterPushdown Enables Parquet filter push-down optimization when set to true.
这将我们引向 parquet-hadoop 的 ParquetInputFormat.setFilterPredicate iff 过滤器已定义。
if (pushed.isDefined) {
ParquetInputFormat.setFilterPredicate(hadoopAttemptContext.getConfiguration, pushed.get)
}
当代码回退到 parquet-mr(而不是使用所谓的矢量化 parquet 解码 reader)时使用过滤器时,代码稍后会变得更有趣。这是我不太理解的部分(除了我在代码中看到的)。
请注意,矢量化 parquet 解码 reader 由 spark.sql.parquet.enableVectorizedReader
Spark 属性 控制,默认打开。
提示:要了解使用了 if
表达式的哪一部分,请为 org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
记录器启用 DEBUG
日志记录级别。
为了查看所有下推过滤器,您可以打开 org.apache.spark.sql.execution.FileSourceScanExec
记录器的 INFO
记录级别。你应该 see the following in the logs:
INFO Pushed Filters: [pushedDownFilters]
我确实希望,如果它不是一个明确的答案,它会有所帮助,并且有人会在我停下来的地方把它捡起来,尽快成为一个。 希望最后破灭 :)
这需要分解
- Parquet 代码是否从 spark 获取谓词(是)
- 然后 parquet 会尝试使用 Hadoop
FileSystem
seek()
+read()
或readFully(position, buffer, length)
调用有选择地只读取那些列吗?是 - S3 连接器是否将这些文件操作转换为高效的 HTTP GET 请求?在 Amazon EMR 中:是的。在 Apache Hadoop 中,您需要在类路径上安装 hadoop 2.8 并正确设置
spark.hadoop.fs.s3a.experimental.fadvise=random
以触发随机访问。
Hadoop 2.7 和更早版本处理主动搜索()绕文件很糟糕,因为它们总是启动 GET 偏移量文件结束,对下一个搜索感到惊讶,必须中止该连接,重新打开一个新的TCP/HTTPS 1.1连接(慢,CPU重),再做一遍,反复做。随机 IO 操作会伤害诸如 .csv.gz 之类的批量加载,但对于获得 ORC/Parquet 性能至关重要。
您无法在 Hadoop 2.7 的 hadoop-aws JAR 上获得加速。如果需要,您需要更新 hadoop*.jar 和依赖项,或者针对 Hadoop 2.8
从头开始构建 Spark请注意,Hadoop 2.8+ 还有一个不错的小功能:如果您在日志语句中的 S3A 文件系统客户端上调用 toString()
,它会打印出所有文件系统 IO 统计信息,包括丢弃了多少数据在寻找中,中止 TCP 连接 &c。帮助您弄清楚发生了什么。
2018-04-13 警告::不要尝试将 Hadoop 2.8+ hadoop-aws
JAR 与 hadoop-2.7 的其余部分一起放在类路径中JAR 设置并期望看到任何加速。您将看到的只是堆栈跟踪。您需要更新所有 hadoop JAR 及其传递依赖项。