Spark:保存按 "virtual" 列分区的 DataFrame
Spark: save DataFrame partitioned by "virtual" column
我正在使用 PySpark 执行经典的 ETL 作业(加载数据集、处理它、保存它)并希望将我的 Dataframe 保存为由 "virtual" 列分区的 files/directory; "virtual" 的意思是我有一个时间戳列,它是一个包含 ISO 8601 编码日期的字符串,我想按年/月/日进行分区;但我实际上在 DataFrame 中没有年、月或日列;我有这个时间戳,虽然我可以从中导出这些列,但我不希望我的结果项目序列化这些列之一。
将 DataFrame 保存到磁盘后的文件结构应如下所示:
/
year=2016/
month=01/
day=01/
part-****.gz
有没有办法用 Spark / Pyspark 做我想做的事?
用于分区的列不包含在序列化数据本身中。例如,如果您像这样创建 DataFrame
:
df = sc.parallelize([
(1, "foo", 2.0, "2016-02-16"),
(2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])
写成如下:
import tempfile
from pyspark.sql.functions import col, dayofmonth, month, year
outdir = tempfile.mktemp()
dt = col("date").cast("date")
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")]
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname]
(df
.select(*exprs)
.write
.partitionBy(*(name for _, name in fname))
.format("json")
.save(outdir))
单个文件将不包含分区列:
import os
(sqlContext.read
.json(os.path.join(outdir, "year=2016/month=2/day=16/"))
.printSchema())
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
分区数据仅存储在目录结构中,不会复制到序列化文件中。只有当您阅读完整或部分目录树时才会附加它:
sqlContext.read.json(outdir).printSchema()
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
## |-- year: integer (nullable = true)
## |-- month: integer (nullable = true)
## |-- day: integer (nullable = true)
sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema()
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
## |-- day: integer (nullable = true)
我正在使用 PySpark 执行经典的 ETL 作业(加载数据集、处理它、保存它)并希望将我的 Dataframe 保存为由 "virtual" 列分区的 files/directory; "virtual" 的意思是我有一个时间戳列,它是一个包含 ISO 8601 编码日期的字符串,我想按年/月/日进行分区;但我实际上在 DataFrame 中没有年、月或日列;我有这个时间戳,虽然我可以从中导出这些列,但我不希望我的结果项目序列化这些列之一。
将 DataFrame 保存到磁盘后的文件结构应如下所示:
/
year=2016/
month=01/
day=01/
part-****.gz
有没有办法用 Spark / Pyspark 做我想做的事?
用于分区的列不包含在序列化数据本身中。例如,如果您像这样创建 DataFrame
:
df = sc.parallelize([
(1, "foo", 2.0, "2016-02-16"),
(2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])
写成如下:
import tempfile
from pyspark.sql.functions import col, dayofmonth, month, year
outdir = tempfile.mktemp()
dt = col("date").cast("date")
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")]
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname]
(df
.select(*exprs)
.write
.partitionBy(*(name for _, name in fname))
.format("json")
.save(outdir))
单个文件将不包含分区列:
import os
(sqlContext.read
.json(os.path.join(outdir, "year=2016/month=2/day=16/"))
.printSchema())
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
分区数据仅存储在目录结构中,不会复制到序列化文件中。只有当您阅读完整或部分目录树时才会附加它:
sqlContext.read.json(outdir).printSchema()
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
## |-- year: integer (nullable = true)
## |-- month: integer (nullable = true)
## |-- day: integer (nullable = true)
sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema()
## root
## |-- date: string (nullable = true)
## |-- id: long (nullable = true)
## |-- x: string (nullable = true)
## |-- y: double (nullable = true)
## |-- day: integer (nullable = true)