Parquet 谓词下推是否使用 Spark 非 EMR 在 S3 上工作?
is Parquet predicate pushdown works on S3 using Spark non EMR?
只是想知道 Parquet 谓词下推是否也适用于 S3,而不仅仅是 HDFS。特别是如果我们使用 Spark(非 EMR)。
进一步的解释可能会有所帮助,因为它可能涉及对分布式文件系统的理解。
Spark 使用 HDFS parquet 和 s3 库,因此可以使用相同的逻辑。
(在 spark 1.6 中,他们为平面模式镶木地板文件添加了更快的快捷方式)
是的。过滤器下推不依赖于底层文件系统。它仅取决于 spark.sql.parquet.filterPushdown
和过滤器类型(并非所有过滤器都可以向下推)。
我自己也在想这个,所以我就测试了一下。我们使用 EMR 集群 和 Spark 1.6.1 .
- 我在 Spark 中生成了一些虚拟数据,并将其保存为本地和 S3 上的镶木地板文件。
- 我创建了多个具有不同类型过滤器和列选择的 Spark 作业。我 运行 这些测试一次用于本地文件,一次用于 S3 文件。
- 然后我使用 Spark History Server 查看每个作业有多少数据作为输入。
结果:
- 对于本地 parquet 文件:结果表明,当作业包含过滤器或列选择时,列选择和过滤器被下推到读取,因为输入大小减小。
- 对于 S3 parquet 文件: 输入大小始终与处理所有数据的 Spark 作业相同。 None 的过滤器或列选择被下推到读取。 镶木地板文件始终从 S3 完全加载。即使查询计划 (.queryExecution.executedPlan) 显示过滤器被下推。
我会在有空的时候添加更多关于测试和结果的细节。
这是我推荐的用于 s3a 工作的密钥
spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true
为了完成这项工作。使用 S3A“零重命名提交者”(hadoop 3.1+)或等效的 EMR。原始的 FileOutputCommitter 缓慢且不安全
最近我在 Spark 2.4 上尝试了这个,看起来 Pushdown 谓词适用于 s3。
这是 spark sql 查询:
explain select * from default.my_table where month = '2009-04' and site = 'http://jdnews.com/sports/game_1997_jdnsports__article.html/play_rain.html' limit 100;
这是输出的一部分:
PartitionFilters: [isnotnull(month#6), (month#6 = 2009-04)], PushedFilters: [IsNotNull(site), EqualTo(site,http://jdnews.com/sports/game_1997_jdnsports__article.html/play_ra...
这清楚地表明 PushedFilters 不为空。
注意:使用的table是在AWS S3
之上创建的
只是想知道 Parquet 谓词下推是否也适用于 S3,而不仅仅是 HDFS。特别是如果我们使用 Spark(非 EMR)。
进一步的解释可能会有所帮助,因为它可能涉及对分布式文件系统的理解。
Spark 使用 HDFS parquet 和 s3 库,因此可以使用相同的逻辑。 (在 spark 1.6 中,他们为平面模式镶木地板文件添加了更快的快捷方式)
是的。过滤器下推不依赖于底层文件系统。它仅取决于 spark.sql.parquet.filterPushdown
和过滤器类型(并非所有过滤器都可以向下推)。
我自己也在想这个,所以我就测试了一下。我们使用 EMR 集群 和 Spark 1.6.1 .
- 我在 Spark 中生成了一些虚拟数据,并将其保存为本地和 S3 上的镶木地板文件。
- 我创建了多个具有不同类型过滤器和列选择的 Spark 作业。我 运行 这些测试一次用于本地文件,一次用于 S3 文件。
- 然后我使用 Spark History Server 查看每个作业有多少数据作为输入。
结果:
- 对于本地 parquet 文件:结果表明,当作业包含过滤器或列选择时,列选择和过滤器被下推到读取,因为输入大小减小。
- 对于 S3 parquet 文件: 输入大小始终与处理所有数据的 Spark 作业相同。 None 的过滤器或列选择被下推到读取。 镶木地板文件始终从 S3 完全加载。即使查询计划 (.queryExecution.executedPlan) 显示过滤器被下推。
我会在有空的时候添加更多关于测试和结果的细节。
这是我推荐的用于 s3a 工作的密钥
spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false
spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000
spark.sql.hive.metastorePartitionPruning true
为了完成这项工作。使用 S3A“零重命名提交者”(hadoop 3.1+)或等效的 EMR。原始的 FileOutputCommitter 缓慢且不安全
最近我在 Spark 2.4 上尝试了这个,看起来 Pushdown 谓词适用于 s3。
这是 spark sql 查询:
explain select * from default.my_table where month = '2009-04' and site = 'http://jdnews.com/sports/game_1997_jdnsports__article.html/play_rain.html' limit 100;
这是输出的一部分:
PartitionFilters: [isnotnull(month#6), (month#6 = 2009-04)], PushedFilters: [IsNotNull(site), EqualTo(site,http://jdnews.com/sports/game_1997_jdnsports__article.html/play_ra...
这清楚地表明 PushedFilters 不为空。
注意:使用的table是在AWS S3
之上创建的