直接查询文件与读取文件后查询数据帧
Querying File Directly vs Querying Data Frame after reading the File
方法一:
直接查询镶木地板文件:
val sqlDF = spark.sql("SELECT columns FROM parquet.`sample.parquet`")
和
方法二:
读取 parquet 文件后查询 Dataframe 为:
df = spark.read.parquet(path_to_parquet_file)
df.select(columns)
和
方法三:
查询临时视图为:
df.createOrReplaceTempView("sample")
val sqlDF = spark.sql("SELECT columns FROM sample")
- 在幕后,所有 3 个基本上都以相同的方式执行吗?
- 在方法 1 中,parquet 是否被转换为 dataframe/dataset
在执行查询之前 ?
- 这 3 种方法中哪种有效,为什么? (如果它们是
不同)
- 这些方法有具体的用例吗? (如果它们是
不同)
谢谢!
简答
是的。您所说明的使用 Spark 查询 Parquet 文件的 3 种方式以相同的方式执行。
长答案
之所以会这样,是因为结合了 Spark 的两个特性:lazy evaluation & query optimization。
作为开发人员,您可以将 Spark 操作拆分为多个步骤(就像您在方法 2 中所做的那样)。在内部,Spark(懒惰地)联合评估操作并对其应用优化。在这种情况下,Spark 可以通过列修剪来优化操作(基本上,它不会将整个 parquet 数据读入内存;只有您请求的特定列。)
创建临时视图的第三种方法就是将读取到的数据命名,以便您在以后的操作中参考。它不会改变它最初的计算方式。
有关 Spark 在读取 Parquet 时执行的优化的更多信息,请参阅此 in-depth article。
注意:
正如我在对该问题的评论中提到的,您已经在方法 2 中选择了特定的列;而另外两个读取整个数据。因为,这些本质上是不同的操作,所以在执行上会有差异。上面的答案假设在三种方法中的每一种中都执行了类似的操作(读取完整数据或从文件中读取某些特定列)。
如果您要评估其中哪“3”个最适合相同 objective,它们之间没有区别。 physical plan
说出你的问题 - 'Behind the scene?'.
方法一:
sqlDF = spark.sql("SELECT CallNumber,CallFinalDisposition FROM parquet.`/tmp/ParquetA`").show()
== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#2988 as string) AS CallNumber#3026, CallFinalDisposition#2992]
+- *(1) FileScan parquet [CallNumber#2988,CallFinalDisposition#2992] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>
方法二:
df = spark.read.parquet('/tmp/ParquetA')
df.select("CallNumber","CallFinalDisposition").show()
== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#3100 as string) AS CallNumber#3172, CallFinalDisposition#3104]
+- *(1) FileScan parquet [CallNumber#3100,CallFinalDisposition#3104] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>
方法三:
tempDF = spark.read.parquet('/tmp/ParquetA/')
tempDF.createOrReplaceTempView("temptable");
tiny = spark.sql("SELECT CallNumber,CallFinalDisposition FROM temptable").show()
== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#2910 as string) AS CallNumber#2982, CallFinalDisposition#2914]
+- *(1) FileScan parquet [CallNumber#2910,CallFinalDisposition#2914] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>
方法一: 直接查询镶木地板文件:
val sqlDF = spark.sql("SELECT columns FROM parquet.`sample.parquet`")
和
方法二: 读取 parquet 文件后查询 Dataframe 为:
df = spark.read.parquet(path_to_parquet_file)
df.select(columns)
和
方法三: 查询临时视图为:
df.createOrReplaceTempView("sample")
val sqlDF = spark.sql("SELECT columns FROM sample")
- 在幕后,所有 3 个基本上都以相同的方式执行吗?
- 在方法 1 中,parquet 是否被转换为 dataframe/dataset 在执行查询之前 ?
- 这 3 种方法中哪种有效,为什么? (如果它们是 不同)
- 这些方法有具体的用例吗? (如果它们是 不同)
谢谢!
简答
是的。您所说明的使用 Spark 查询 Parquet 文件的 3 种方式以相同的方式执行。
长答案
之所以会这样,是因为结合了 Spark 的两个特性:lazy evaluation & query optimization。
作为开发人员,您可以将 Spark 操作拆分为多个步骤(就像您在方法 2 中所做的那样)。在内部,Spark(懒惰地)联合评估操作并对其应用优化。在这种情况下,Spark 可以通过列修剪来优化操作(基本上,它不会将整个 parquet 数据读入内存;只有您请求的特定列。)
创建临时视图的第三种方法就是将读取到的数据命名,以便您在以后的操作中参考。它不会改变它最初的计算方式。
有关 Spark 在读取 Parquet 时执行的优化的更多信息,请参阅此 in-depth article。
注意: 正如我在对该问题的评论中提到的,您已经在方法 2 中选择了特定的列;而另外两个读取整个数据。因为,这些本质上是不同的操作,所以在执行上会有差异。上面的答案假设在三种方法中的每一种中都执行了类似的操作(读取完整数据或从文件中读取某些特定列)。
如果您要评估其中哪“3”个最适合相同 objective,它们之间没有区别。 physical plan
说出你的问题 - 'Behind the scene?'.
方法一:
sqlDF = spark.sql("SELECT CallNumber,CallFinalDisposition FROM parquet.`/tmp/ParquetA`").show()
== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#2988 as string) AS CallNumber#3026, CallFinalDisposition#2992]
+- *(1) FileScan parquet [CallNumber#2988,CallFinalDisposition#2992] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>
方法二:
df = spark.read.parquet('/tmp/ParquetA')
df.select("CallNumber","CallFinalDisposition").show()
== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#3100 as string) AS CallNumber#3172, CallFinalDisposition#3104]
+- *(1) FileScan parquet [CallNumber#3100,CallFinalDisposition#3104] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>
方法三:
tempDF = spark.read.parquet('/tmp/ParquetA/')
tempDF.createOrReplaceTempView("temptable");
tiny = spark.sql("SELECT CallNumber,CallFinalDisposition FROM temptable").show()
== Physical Plan ==
CollectLimit 21
+- *(1) Project [cast(CallNumber#2910 as string) AS CallNumber#2982, CallFinalDisposition#2914]
+- *(1) FileScan parquet [CallNumber#2910,CallFinalDisposition#2914] Batched: true, Format: Parquet, Location: InMemoryFileIndex[dbfs:/tmp/ParquetA], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<CallNumber:int,CallFinalDisposition:string>