从多个 S3 存储桶导入 pyspark 数据框,其中一列表示条目来自哪个存储桶
Import pyspark dataframe from multiple S3 buckets, with a column denoting which bucket the entry came from
我有一个按日期分区的 S3 存储桶列表。第一个桶名为 2019-12-1,第二个桶名为 2019-12-2,依此类推
这些桶中的每一个都将我正在读入 pyspark 数据帧的镶木地板文件存储起来。从每个存储桶生成的 pyspark 数据帧具有完全相同的架构。我想做的是遍历这些存储桶,并将所有这些镶木地板文件存储到一个 pyspark 数据框中,该数据框中有一个日期列,表示数据框中每个条目实际来自哪个存储桶。
因为分别导入每个桶时生成的数据帧的模式有很多层深(即每一行包含结构数组的结构等),我想将所有桶组合成一个数据帧的唯一方法是有一个带有单个 'dates' 列的数据框。 'dates' 列的每一行都将包含该日期相应 S3 存储桶的内容。
我可以通过这一行读取所有日期:
df = spark.read.parquet("s3://my_bucket/*")
我看到有人通过向该行添加一个 'withColumn' 调用来创建一个 'dates' 列来实现我正在描述的内容,但我不记得是如何实现的。
使用 input_file_name()
您可以从文件路径中提取 S3 存储桶名称:
df.withColumn("dates", split(regexp_replace(input_file_name(), "s3://", ""), "/").getItem(0))\
.show()
我们拆分文件名并得到与存储桶名称对应的第一部分。
也可以使用正则表达式 s3:\/\/(.+?)\/(.+)
,第一组是存储桶名称:
df.withColumn("dates", regexp_extract(input_file_name(), "s3:\/\/(.+?)\/(.+)", 1)).show()
我有一个按日期分区的 S3 存储桶列表。第一个桶名为 2019-12-1,第二个桶名为 2019-12-2,依此类推
这些桶中的每一个都将我正在读入 pyspark 数据帧的镶木地板文件存储起来。从每个存储桶生成的 pyspark 数据帧具有完全相同的架构。我想做的是遍历这些存储桶,并将所有这些镶木地板文件存储到一个 pyspark 数据框中,该数据框中有一个日期列,表示数据框中每个条目实际来自哪个存储桶。
因为分别导入每个桶时生成的数据帧的模式有很多层深(即每一行包含结构数组的结构等),我想将所有桶组合成一个数据帧的唯一方法是有一个带有单个 'dates' 列的数据框。 'dates' 列的每一行都将包含该日期相应 S3 存储桶的内容。
我可以通过这一行读取所有日期:
df = spark.read.parquet("s3://my_bucket/*")
我看到有人通过向该行添加一个 'withColumn' 调用来创建一个 'dates' 列来实现我正在描述的内容,但我不记得是如何实现的。
使用 input_file_name()
您可以从文件路径中提取 S3 存储桶名称:
df.withColumn("dates", split(regexp_replace(input_file_name(), "s3://", ""), "/").getItem(0))\
.show()
我们拆分文件名并得到与存储桶名称对应的第一部分。
也可以使用正则表达式 s3:\/\/(.+?)\/(.+)
,第一组是存储桶名称:
df.withColumn("dates", regexp_extract(input_file_name(), "s3:\/\/(.+?)\/(.+)", 1)).show()