如何让 Spark 使用 Parquet 文件中的分区信息?
How to make Spark use partition information from Parquet files?
我正在尝试为某些 SparkSql 查询预先计算分区。如果我计算并保留分区,Spark 会使用它们。如果我将分区数据保存到 Parquet 并稍后重新加载它,分区信息将消失,Spark 将重新计算它。
实际数据足够大,需要花费大量时间进行分区。下面的代码充分说明了这些问题。 Test2() 目前是我唯一可以开始工作的东西,但我想快速启动实际处理,这正是 test3() 试图做的。
有人知道我做错了什么吗? ..或者如果这是 Spark 可以做的事情?
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# NOTE: Need to have python in PATH, SPARK_HOME set to location of spark, HADOOP_HOME set to location of winutils
if __name__ == "__main__":
sc = SparkContext(appName="PythonPartitionBug")
sql_text = "select foo, bar from customer c, orders o where foo < 300 and c.custkey=o.custkey"
def setup():
sqlContext = SQLContext(sc)
fields1 = [StructField(name, IntegerType()) for name in ['custkey', 'foo']]
data1 = [(1, 110), (2, 210), (3, 310), (4, 410), (5, 510)]
df1 = sqlContext.createDataFrame(data1, StructType(fields1))
df1.persist()
fields2 = [StructField(name, IntegerType()) for name in ['orderkey', 'custkey', 'bar']]
data2 = [(1, 1, 10), (2, 1, 20), (3, 2, 30), (4, 3, 40), (5, 4, 50)]
df2 = sqlContext.createDataFrame(data2, StructType(fields2))
df2.persist()
return sqlContext, df1, df2
def test1():
# Without repartition the final plan includes hashpartitioning
# == Physical Plan ==
# Project [foo#1,bar#14]
# +- SortMergeJoin [custkey#0], [custkey#13]
# :- Sort [custkey#0 ASC], false, 0
# : +- TungstenExchange hashpartitioning(custkey#0,200), None
# : +- Filter (foo#1 < 300)
# : +- InMemoryColumnarTableScan [custkey#0,foo#1], [(foo#1 < 300)], InMemoryRelation [custkey#0,foo#1], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
# +- Sort [custkey#13 ASC], false, 0
# +- TungstenExchange hashpartitioning(custkey#13,200), None
# +- InMemoryColumnarTableScan [bar#14,custkey#13], InMemoryRelation [orderkey#12,custkey#13,bar#14], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
sqlContext, df1, df2 = setup()
df1.registerTempTable("customer")
df2.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
def test2():
# With repartition the final plan does not include hashpartitioning
# == Physical Plan ==
# Project [foo#56,bar#69]
# +- SortMergeJoin [custkey#55], [custkey#68]
# :- Sort [custkey#55 ASC], false, 0
# : +- Filter (foo#56 < 300)
# : +- InMemoryColumnarTableScan [custkey#55,foo#56], [(foo#56 < 300)], InMemoryRelation [custkey#55,foo#56], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#55,4), None, None
# +- Sort [custkey#68 ASC], false, 0
# +- InMemoryColumnarTableScan [bar#69,custkey#68], InMemoryRelation [orderkey#67,custkey#68,bar#69], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#68,4), None, None
sqlContext, df1, df2 = setup()
df1a = df1.repartition(4, 'custkey').persist()
df1a.registerTempTable("customer")
df2a = df2.repartition(4, 'custkey').persist()
df2a.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
def test3():
# After round tripping the partitioned data, the partitioning is lost and spark repartitions
# == Physical Plan ==
# Project [foo#223,bar#284]
# +- SortMergeJoin [custkey#222], [custkey#283]
# :- Sort [custkey#222 ASC], false, 0
# : +- TungstenExchange hashpartitioning(custkey#222,200), None
# : +- Filter (foo#223 < 300)
# : +- InMemoryColumnarTableScan [custkey#222,foo#223], [(foo#223 < 300)], InMemoryRelation [custkey#222,foo#223], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[custkey#222,foo#223] InputPaths: file:/E:/.../df1.parquet, None
# +- Sort [custkey#283 ASC], false, 0
# +- TungstenExchange hashpartitioning(custkey#283,200), None
# +- InMemoryColumnarTableScan [bar#284,custkey#283], InMemoryRelation [orderkey#282,custkey#283,bar#284], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[orderkey#282,custkey#283,bar#284] InputPaths: file:/E:/.../df2.parquet, None
sqlContext, df1, df2 = setup()
df1a = df1.repartition(4, 'custkey').persist()
df1a.write.parquet("df1.parquet", mode='overwrite')
df1a = sqlContext.read.parquet("df1.parquet")
df1a.persist()
df1a.registerTempTable("customer")
df2a = df2.repartition(4, 'custkey').persist()
df2a.write.parquet("df2.parquet", mode='overwrite')
df2a = sqlContext.read.parquet("df2.parquet")
df2a.persist()
df2a.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
test1()
test2()
test3()
sc.stop()
你没有做错任何事 - 但你无法实现你想用 Spark 实现的目标:用于保存文件的分区程序在写入时必然 lost到磁盘。为什么?因为 Spark 没有自己的文件格式,它依赖于现有格式(例如 Parquet、ORC 或文本文件),其中 none 甚至 知道 分区程序(这是 Spark 内部的),因此他们无法保留该信息。数据在磁盘上正确分区,但Spark无法知道从磁盘加载时使用的是什么分区器,所以它别无选择,只能重新分区。
test2() 没有揭示这一点的原因是您重用了相同的 DataFrame 实例,这些实例确实存储了分区信息(在内存中)。
更好的解决方案是使用 persist(StorageLevel.MEMORY_AND_DISK_ONLY)
,如果 RDD/DF 分区从内存中被逐出,它将溢出到 Worker 的本地磁盘。这种情况下,重建分区只需要从Worker的本地磁盘拉取数据,速度比较快。
我正在尝试为某些 SparkSql 查询预先计算分区。如果我计算并保留分区,Spark 会使用它们。如果我将分区数据保存到 Parquet 并稍后重新加载它,分区信息将消失,Spark 将重新计算它。
实际数据足够大,需要花费大量时间进行分区。下面的代码充分说明了这些问题。 Test2() 目前是我唯一可以开始工作的东西,但我想快速启动实际处理,这正是 test3() 试图做的。
有人知道我做错了什么吗? ..或者如果这是 Spark 可以做的事情?
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
# NOTE: Need to have python in PATH, SPARK_HOME set to location of spark, HADOOP_HOME set to location of winutils
if __name__ == "__main__":
sc = SparkContext(appName="PythonPartitionBug")
sql_text = "select foo, bar from customer c, orders o where foo < 300 and c.custkey=o.custkey"
def setup():
sqlContext = SQLContext(sc)
fields1 = [StructField(name, IntegerType()) for name in ['custkey', 'foo']]
data1 = [(1, 110), (2, 210), (3, 310), (4, 410), (5, 510)]
df1 = sqlContext.createDataFrame(data1, StructType(fields1))
df1.persist()
fields2 = [StructField(name, IntegerType()) for name in ['orderkey', 'custkey', 'bar']]
data2 = [(1, 1, 10), (2, 1, 20), (3, 2, 30), (4, 3, 40), (5, 4, 50)]
df2 = sqlContext.createDataFrame(data2, StructType(fields2))
df2.persist()
return sqlContext, df1, df2
def test1():
# Without repartition the final plan includes hashpartitioning
# == Physical Plan ==
# Project [foo#1,bar#14]
# +- SortMergeJoin [custkey#0], [custkey#13]
# :- Sort [custkey#0 ASC], false, 0
# : +- TungstenExchange hashpartitioning(custkey#0,200), None
# : +- Filter (foo#1 < 300)
# : +- InMemoryColumnarTableScan [custkey#0,foo#1], [(foo#1 < 300)], InMemoryRelation [custkey#0,foo#1], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
# +- Sort [custkey#13 ASC], false, 0
# +- TungstenExchange hashpartitioning(custkey#13,200), None
# +- InMemoryColumnarTableScan [bar#14,custkey#13], InMemoryRelation [orderkey#12,custkey#13,bar#14], true, 10000, StorageLevel(false, true, false, false, 1), ConvertToUnsafe, None
sqlContext, df1, df2 = setup()
df1.registerTempTable("customer")
df2.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
def test2():
# With repartition the final plan does not include hashpartitioning
# == Physical Plan ==
# Project [foo#56,bar#69]
# +- SortMergeJoin [custkey#55], [custkey#68]
# :- Sort [custkey#55 ASC], false, 0
# : +- Filter (foo#56 < 300)
# : +- InMemoryColumnarTableScan [custkey#55,foo#56], [(foo#56 < 300)], InMemoryRelation [custkey#55,foo#56], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#55,4), None, None
# +- Sort [custkey#68 ASC], false, 0
# +- InMemoryColumnarTableScan [bar#69,custkey#68], InMemoryRelation [orderkey#67,custkey#68,bar#69], true, 10000, StorageLevel(false, true, false, false, 1), TungstenExchange hashpartitioning(custkey#68,4), None, None
sqlContext, df1, df2 = setup()
df1a = df1.repartition(4, 'custkey').persist()
df1a.registerTempTable("customer")
df2a = df2.repartition(4, 'custkey').persist()
df2a.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
def test3():
# After round tripping the partitioned data, the partitioning is lost and spark repartitions
# == Physical Plan ==
# Project [foo#223,bar#284]
# +- SortMergeJoin [custkey#222], [custkey#283]
# :- Sort [custkey#222 ASC], false, 0
# : +- TungstenExchange hashpartitioning(custkey#222,200), None
# : +- Filter (foo#223 < 300)
# : +- InMemoryColumnarTableScan [custkey#222,foo#223], [(foo#223 < 300)], InMemoryRelation [custkey#222,foo#223], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[custkey#222,foo#223] InputPaths: file:/E:/.../df1.parquet, None
# +- Sort [custkey#283 ASC], false, 0
# +- TungstenExchange hashpartitioning(custkey#283,200), None
# +- InMemoryColumnarTableScan [bar#284,custkey#283], InMemoryRelation [orderkey#282,custkey#283,bar#284], true, 10000, StorageLevel(false, true, false, false, 1), Scan ParquetRelation[orderkey#282,custkey#283,bar#284] InputPaths: file:/E:/.../df2.parquet, None
sqlContext, df1, df2 = setup()
df1a = df1.repartition(4, 'custkey').persist()
df1a.write.parquet("df1.parquet", mode='overwrite')
df1a = sqlContext.read.parquet("df1.parquet")
df1a.persist()
df1a.registerTempTable("customer")
df2a = df2.repartition(4, 'custkey').persist()
df2a.write.parquet("df2.parquet", mode='overwrite')
df2a = sqlContext.read.parquet("df2.parquet")
df2a.persist()
df2a.registerTempTable("orders")
df3 = sqlContext.sql(sql_text)
df3.collect()
df3.explain(True)
test1()
test2()
test3()
sc.stop()
你没有做错任何事 - 但你无法实现你想用 Spark 实现的目标:用于保存文件的分区程序在写入时必然 lost到磁盘。为什么?因为 Spark 没有自己的文件格式,它依赖于现有格式(例如 Parquet、ORC 或文本文件),其中 none 甚至 知道 分区程序(这是 Spark 内部的),因此他们无法保留该信息。数据在磁盘上正确分区,但Spark无法知道从磁盘加载时使用的是什么分区器,所以它别无选择,只能重新分区。
test2() 没有揭示这一点的原因是您重用了相同的 DataFrame 实例,这些实例确实存储了分区信息(在内存中)。
更好的解决方案是使用 persist(StorageLevel.MEMORY_AND_DISK_ONLY)
,如果 RDD/DF 分区从内存中被逐出,它将溢出到 Worker 的本地磁盘。这种情况下,重建分区只需要从Worker的本地磁盘拉取数据,速度比较快。