直接查询文件与读取文件后查询数据帧

Querying File Directly vs Querying Data Frame after reading the File

方法一: 直接查询镶木地板文件:

val sqlDF = spark.sql("SELECT columns FROM parquet.`sample.parquet`")

方法二: 读取 parquet 文件后查询 Dataframe 为:

df = spark.read.parquet(path_to_parquet_file)
df.select(columns)

方法三: 查询临时视图为:

df.createOrReplaceTempView("sample")
val sqlDF = spark.sql("SELECT columns FROM sample")
  1. 在幕后,所有 3 个基本上都以相同的方式执行吗?
  2. 在方法 1 中,parquet 是否被转换为 dataframe/dataset 在执行查询之前 ?
  3. 这 3 种方法中哪种有效,为什么? (如果它们是 不同)
  4. 这些方法有具体的用例吗? (如果它们是 不同)

谢谢!

简答

是的。您所说明的使用 Spark 查询 Parquet 文件的 3 种方式以相同的方式执行。

长答案

之所以会这样,是因为结合了 Spark 的两个特性:lazy evaluation & query optimization

作为开发人员,您可以将 Spark 操作拆分为多个步骤(就像您在方法 2 中所做的那样)。在内部,Spark(懒惰地)联合评估操作并对其应用优化。在这种情况下,Spark 可以通过列修剪来优化操作(基本上,它不会将整个 parquet 数据读入内存;只有您请求的特定列。)

创建临时视图的第三种方法就是将读取到的数据命名,以便您在以后的操作中参考。它不会改变它最初的计算方式。

有关 Spark 在读取 Parquet 时执行的优化的更多信息,请参阅此 in-depth article

注意: 正如我在对该问题的评论中提到的,您已经在方法 2 中选择了特定的列;而另外两个读取整个数据。因为,这些本质上是不同的操作,所以在执行上会有差异。上面的答案假设在三种方法中的每一种中都执行了类似的操作(读取完整数据或从文件中读取某些特定列)。

如果您要评估其中哪“3”个最适合相同 objective,它们之间没有区别。 physical plan 说出你的问题 - 'Behind the scene?'.

方法一:

sqlDF = spark.sql("SELECT CallNumber,CallFinalDisposition FROM parquet.`/tmp/ParquetA`").show()

== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#2988 as string) AS CallNumber#3026, CallFinalDisposition#2992]
   +- *(1) FileScan parquet [CallNumber#2988,CallFinalDisposition#2992] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>

方法二:

df = spark.read.parquet('/tmp/ParquetA')
df.select("CallNumber","CallFinalDisposition").show()

== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#3100 as string) AS CallNumber#3172, CallFinalDisposition#3104]
   +- *(1) FileScan parquet [CallNumber#3100,CallFinalDisposition#3104] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>

方法三:

tempDF = spark.read.parquet('/tmp/ParquetA/')
tempDF.createOrReplaceTempView("temptable");
tiny = spark.sql("SELECT CallNumber,CallFinalDisposition FROM temptable").show()

== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#2910 as string) AS CallNumber#2982, CallFinalDisposition#2914]
   +- *(1) FileScan parquet [CallNumber#2910,CallFinalDisposition#2914] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>