重命名因 partitionBy 而创建的文件夹

Rename the folder created as a result of partitionBy

我正在为胶水上作业 运行 的时间戳添加一列。我想使用 partitionBy(load_timestamp) 保存它。例如创建了一个文件夹load_timestamp=2020-04-27 03:21:54. 我希望将文件夹命名为 table_name=2020-04-27 03:21:54. 这可能吗?

enriched = df.withColumn("load_timestamp", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
enriched.write.partitionBy("load_timestamp").format("parquet").mode("append").save("s3://s3-enriched-bucket/" + job_statement[0])

默认情况下,Spark 根据分区列创建 directories,即

<partition_column_name>=<value>

Easiest way 修复方法是将列名保留为 table_name 并在 partition by 子句中使用。

enriched = df.withColumn("table_name", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))

enriched.write.partitionBy("table_name").format("parquet").mode("append").save("s3://s3-enriched-bucket/" + job_statement[0])

Other way would be:

通过使用 迭代重命名目录并将 load_timestamp 更改为 table_name