获取spark中parquet表目录的源文件
Get source files for directory of parquet tables in spark
我有一些代码,我通过目录和通配符读取许多镶木地板表,如下所示:
df = sqlContext.read.load("some_dir/*")
有什么方法可以获取结果 DataFrame 中每一行的源文件,df
?
让我们创建一些虚拟数据并将其保存为 parquet 格式。
spark.range(1,1000).write.save("./foo/bar")
spark.range(1,2000).write.save("./foo/bar2")
spark.range(1,3000).write.save("./foo/bar3")
现在我们可以根据需要读取数据了:
import org.apache.spark.sql.functions.input_file_name
spark.read.load("./foo/*")
.select(input_file_name(), $"id")
.show(3,false)
// +---------------------------------------------------------------------------------------+---+
// |INPUT_FILE_NAME() |id |
// +---------------------------------------------------------------------------------------+---+
// |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|500|
// |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|501|
// |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|502|
// +---------------------------------------------------------------------------------------+---+
从 Spark 1.6 开始,您可以组合 parquet
数据源和 input_file_name
函数,如上所示。
在 spark 2.x 和 pyspark
之前,这似乎是错误的,但这就是它的完成方式:
from pyspark.sql.functions import input_file_name
spark.read.load("./foo/*") \
.select(input_file_name(), "id") \
.show(3,truncate=False)
# +---------------------------------------------------------------------------------------+---+
# |INPUT_FILE_NAME() |id |
# +---------------------------------------------------------------------------------------+---+
# |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|500|
# |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|501|
# |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|502|
# +---------------------------------------------------------------------------------------+---+
我有一些代码,我通过目录和通配符读取许多镶木地板表,如下所示:
df = sqlContext.read.load("some_dir/*")
有什么方法可以获取结果 DataFrame 中每一行的源文件,df
?
让我们创建一些虚拟数据并将其保存为 parquet 格式。
spark.range(1,1000).write.save("./foo/bar")
spark.range(1,2000).write.save("./foo/bar2")
spark.range(1,3000).write.save("./foo/bar3")
现在我们可以根据需要读取数据了:
import org.apache.spark.sql.functions.input_file_name
spark.read.load("./foo/*")
.select(input_file_name(), $"id")
.show(3,false)
// +---------------------------------------------------------------------------------------+---+
// |INPUT_FILE_NAME() |id |
// +---------------------------------------------------------------------------------------+---+
// |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|500|
// |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|501|
// |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|502|
// +---------------------------------------------------------------------------------------+---+
从 Spark 1.6 开始,您可以组合 parquet
数据源和 input_file_name
函数,如上所示。
在 spark 2.x 和 pyspark
之前,这似乎是错误的,但这就是它的完成方式:
from pyspark.sql.functions import input_file_name
spark.read.load("./foo/*") \
.select(input_file_name(), "id") \
.show(3,truncate=False)
# +---------------------------------------------------------------------------------------+---+
# |INPUT_FILE_NAME() |id |
# +---------------------------------------------------------------------------------------+---+
# |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|500|
# |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|501|
# |file:/home/eliasah/foo/bar/part-r-00002-9554d123-23fc-4524-a900-1cdbd9274cc3.gz.parquet|502|
# +---------------------------------------------------------------------------------------+---+