与谓词下推相关的数据块分区

databricks partitioning w/ relation to predicate pushdown

我已经搜索了很多简洁的答案,希望有人能帮助我弄清楚数据块分区..

假设我有一个包含列的数据框:YearMonthDaySalesAmountStoreNumber

我想按年、月分区存储这个。所以我可以运行以下命令:

df.write.partitionBy('Year', 'Month').format('csv').save('/mnt/path/', header='true')

这将以以下格式输出数据:/path/Year=2019/Month=05/<file-0000x>.csv

如果我再加载回来,比如:

spark.read.format('csv').options(header='true').load('/mnt/path/').createOrReplaceTempView("temp1")

Q1:这个其实还没有'read'数据吧?即我可能有数十亿条记录.. 但在我实际查询 temp1 之前,没有对源执行任何操作?

Q2-A: 随后,当使用 temp1 查询此数据时,我假设如果我将分区中使用的项目包含在where 子句,将对从磁盘读取的实际文件应用智能过滤?

%sql
select * from temp1 where Year = 2019 and Month = 05 -- OPTIMAL

而以下不会进行任何文件过滤,因为它没有要查找的分区的上下文:

%sql
select * from temp1 where StoreNum = 152 and SalesAmount > 10000 -- SUB-OPTIMAL

Q2-B: 最后,如果我将文件存储为镶木地板格式(而不是 *.csv).. 上面的两个查询 'push down' 都会在到存储的实际数据..但可能以不同的方式?

即第一个仍会使用分区,但第二个 (where StoreNum = 152 and SalesAmount > 10000) 现在将使用镶木地板的柱状存储?虽然 *.csv 没有那个优化?

任何人都可以澄清我对此的想法/理解吗?

资源链接也很好..

Q1,当您在不提供架构的情况下读取 csv 文件时,它必须推断架构并且立即读取所有文件(如果可以的话,它可能会在此时过滤分区)。 如果您要提供一个架构,那么您对过滤的假设和执行事件假设都是正确的。

Q2。不确定我是否遵循。当您说两个查询时,您是指上方还是下方?在下面的一个写,你希望过滤如何在写上工作?

如果您指的是 parquet 中的前两个查询,那么第一个查询将消除大部分文件并且速度非常快。第二个将希望通过使用文件中的统计信息来显示它不需要读取它们来跳过一些数据。但它仍然会触及每个文件。

您可能会发现这很有用https://db-blog.web.cern.ch/blog/luca-canali/2017-06-diving-spark-and-parquet-workloads-example

A1:你对createOrReplaceTempView的评价是对的。这将对当前 Spark 会话 lazily 进行评估。换句话说,如果您终止 Spark 会话而不访问它,数据将永远不会传输到 temp1.

A2:让我们使用您的代码通过一个例子来分析一下这个案例。首先让我们保存您的数据:

df.write.mode("overwrite").option("header", "true")
  .partitionBy("Year", "Month")
  .format("csv")
  .save("/tmp/partition_test1/")

然后加载它:

val df1 = spark.read.option("header", "true")
                .csv("/tmp/partition_test1/")
                .where($"Year" === 2019 && $"Month" === 5)

执行 df1.explain 将 return:

== Physical Plan ==
*(1) FileScan csv [Day#328,SalesAmount#329,StoreNumber#330,Year#331,Month#332] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 0, Partition
Filters: [isnotnull(Year#331), isnotnull(Month#332), (Year#331 = 2019), (Month#332 = 5)], PushedFilters: [], ReadSchema: struct<Day:string,SalesAmount:string,StoreNumber:string>

如您所见,PushedFilters: [] 数组是空的,而 PartitionFilters[] 不是,这表明 Spark 能够对分区应用过滤,因此修剪不满足 where 语句。

如果我们稍微将 Spark 查询更改为:

df1.where($"StoreNumber" === 1 && $"Year" === 2011 && $"Month" === 11).explain

== Physical Plan ==
*(1) Project [Day#462, SalesAmount#463, StoreNumber#464, Year#465, Month#466]
+- *(1) Filter (isnotnull(StoreNumber#464) && (cast(StoreNumber#464 as int) = 1))
   +- *(1) FileScan csv [Day#462,SalesAmount#463,StoreNumber#464,Year#465,Month#466] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/tmp/partition_test1], PartitionCount: 1, Par
titionFilters: [isnotnull(Month#466), isnotnull(Year#465), (Year#465 = 2011), (Month#466 = 11)], PushedFilters: [IsNotNull(StoreNumber)], ReadSchema: struct<Day:string,SalesAmount:string,Store
Number:string>

现在 PartitionFiltersPushedFilters 都将最小化 Spark 工作负载。如您所见,Spark 首先通过 PartitionFilters 识别现有分区,然后应用谓词下推来利用这两个过滤器。

完全相同的情况适用于 parquet 文件,最大的不同是 parquet 将利用谓词下推过滤器将它们与其内部基于柱状的系统(如您已经提到的)结合起来,从而保留指标和统计信息在数据上。因此与 CSV 文件的区别在于,在 CSV 的情况下,当 Spark reading/scanning CSV 文件排除不满足谓词下推条件的记录时,谓词下推将发生。对于 parquet,谓词下推过滤器将传播到 parquet 内部系统,从而导致更大的数据修剪。

在您的情况下,从 createOrReplaceTempView 加载数据不会有所不同,执行计划将保持不变。

一些有用的链接:

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html

https://www.waitingforcode.com/apache-spark-sql/predicate-pushdown-spark-sql/read

https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-SparkStrategy-FileSourceStrategy.html