读取没有分区列名的分区列
Reading partition columns without partition column names
我们将存储在 s3 中的数据按以下结构分区:
bucket/directory/table/aaaa/bb/cc/dd/
其中 aaaa
是年,bb
是月,cc
是日,dd
是时。
如您所见,路径中没有分区键 (year=aaaa
、month=bb
、day=cc
、hour=dd)
.
因此,当我将 table 读入 Spark 时,没有 year
、month
、day
或 hour
列。
我是否可以将 table 读入 Spark 并包含分区列 而无需 :
- 更改 s3 中的路径名
- 循环遍历每个分区值并将每个分区一个一个地读取到 Spark 中(这是一个巨大的 table 并且这花费的时间太长并且显然不是最佳的)。
Spark 无法 discover partitions 路径中未编码为 partition_name=value
的内容,因此您必须创建它们。
将路径 bucket/directory/table/aaaa/bb/cc/dd/
加载到 DataFrame 后,您可以从 input_file_name()
.
获得的源文件名中提取这些分区
首先,使用 /
分隔符拆分文件名路径,然后从最后 4 个元素创建列:
from pyspark.sql import functions as F
df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4)) \
.withColumn("year", F.col("date_partitions").getItem(0)) \
.withColumn("month", F.col("date_partitions").getItem(1)) \
.withColumn("day", F.col("date_partitions").getItem(2)) \
.withColumn("hour", F.col("date_partitions").getItem(3)) \
.drop("data_partitions")
示例:
data = [
(1, 2, "bucket/directory/table/2021/01/10/14/"),
(3, 4, "bucket/directory/table/2021/01/11/18/")
]
df = spark.createDataFrame(data, ["a", "b", "input_file_name"])
给出:
#+---+---+-------------------------------------+----+-----+---+----+
#|a |b |input_file_name |year|month|day|hour|
#+---+---+-------------------------------------+----+-----+---+----+
#|1 |2 |bucket/directory/table/2021/01/10/14/|2021|01 |10 |14 |
#|3 |4 |bucket/directory/table/2021/01/11/18/|2021|01 |11 |18 |
#+---+---+-------------------------------------+----+-----+---+----+
我们将存储在 s3 中的数据按以下结构分区:
bucket/directory/table/aaaa/bb/cc/dd/
其中 aaaa
是年,bb
是月,cc
是日,dd
是时。
如您所见,路径中没有分区键 (year=aaaa
、month=bb
、day=cc
、hour=dd)
.
因此,当我将 table 读入 Spark 时,没有 year
、month
、day
或 hour
列。
我是否可以将 table 读入 Spark 并包含分区列 而无需 :
- 更改 s3 中的路径名
- 循环遍历每个分区值并将每个分区一个一个地读取到 Spark 中(这是一个巨大的 table 并且这花费的时间太长并且显然不是最佳的)。
Spark 无法 discover partitions 路径中未编码为 partition_name=value
的内容,因此您必须创建它们。
将路径 bucket/directory/table/aaaa/bb/cc/dd/
加载到 DataFrame 后,您可以从 input_file_name()
.
首先,使用 /
分隔符拆分文件名路径,然后从最后 4 个元素创建列:
from pyspark.sql import functions as F
df1 = df.withColumn("date_partitions", F.slice(F.split(F.input_file_name(), "/"), -5, 4)) \
.withColumn("year", F.col("date_partitions").getItem(0)) \
.withColumn("month", F.col("date_partitions").getItem(1)) \
.withColumn("day", F.col("date_partitions").getItem(2)) \
.withColumn("hour", F.col("date_partitions").getItem(3)) \
.drop("data_partitions")
示例:
data = [
(1, 2, "bucket/directory/table/2021/01/10/14/"),
(3, 4, "bucket/directory/table/2021/01/11/18/")
]
df = spark.createDataFrame(data, ["a", "b", "input_file_name"])
给出:
#+---+---+-------------------------------------+----+-----+---+----+
#|a |b |input_file_name |year|month|day|hour|
#+---+---+-------------------------------------+----+-----+---+----+
#|1 |2 |bucket/directory/table/2021/01/10/14/|2021|01 |10 |14 |
#|3 |4 |bucket/directory/table/2021/01/11/18/|2021|01 |11 |18 |
#+---+---+-------------------------------------+----+-----+---+----+