Parquet 谓词下推是否使用 Spark 非 EMR 在 S3 上工作?

is Parquet predicate pushdown works on S3 using Spark non EMR?

只是想知道 Parquet 谓词下推是否也适用于 S3,而不仅仅是 HDFS。特别是如果我们使用 Spark(非 EMR)。

进一步的解释可能会有所帮助,因为它可能涉及对分布式文件系统的理解。

Spark 使用 HDFS parquet 和 s3 库,因此可以使用相同的逻辑。 (在 spark 1.6 中,他们为平面模式镶木地板文件添加了更快的快捷方式)

是的。过滤器下推不依赖于底层文件系统。它仅取决于 spark.sql.parquet.filterPushdown 和过滤器类型(并非所有过滤器都可以向下推)。

下推逻辑参见https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala#L313

我自己也在想这个,所以我就测试了一下。我们使用 EMR 集群Spark 1.6.1 .

  • 我在 Spark 中生成了一些虚拟数据,并将其保存为本地和 S3 上的镶木地板文件。
  • 我创建了多个具有不同类型过滤器和列选择的 Spark 作业。我 运行 这些测试一次用于本地文件,一次用于 S3 文件。
  • 然后我使用 Spark History Server 查看每个作业有多少数据作为输入。

结果:

  • 对于本地 parquet 文件:结果表明,当作业包含过滤器或列选择时,列选择和过滤器被下推到读取,因为输入大小减小。
  • 对于 S3 parquet 文件: 输入大小始终与处理所有数据的 Spark 作业相同。 None 的过滤器或列选择被下推到读取。 镶木地板文件始终从 S3 完全加载。即使查询计划 (.queryExecution.executedPlan) 显示过滤器被下推。

我会在有空的时候添加更多关于测试和结果的细节。

这是我推荐的用于 s3a 工作的密钥

spark.sql.parquet.filterPushdown true
spark.sql.parquet.mergeSchema false
spark.hadoop.parquet.enable.summary-metadata false

spark.sql.orc.filterPushdown true
spark.sql.orc.splits.include.file.footer true
spark.sql.orc.cache.stripe.details.size 10000

spark.sql.hive.metastorePartitionPruning true

为了完成这项工作。使用 S3A“零重命名提交者”(hadoop 3.1+)或等效的 EMR。原始的 FileOutputCommitter 缓慢且不安全

最近我在 Spark 2.4 上尝试了这个,看起来 Pushdown 谓词适用于 s3。

这是 spark sql 查询:

explain select * from default.my_table where month = '2009-04' and site = 'http://jdnews.com/sports/game_1997_jdnsports__article.html/play_rain.html' limit 100;

这是输出的一部分:

PartitionFilters: [isnotnull(month#6), (month#6 = 2009-04)], PushedFilters: [IsNotNull(site), EqualTo(site,http://jdnews.com/sports/game_1997_jdnsports__article.html/play_ra...

这清楚地表明 PushedFilters 不为空。

注意:使用的table是在AWS S3

之上创建的