重命名因 partitionBy 而创建的文件夹
Rename the folder created as a result of partitionBy
我正在为胶水上作业 运行 的时间戳添加一列。我想使用 partitionBy(load_timestamp)
保存它。例如创建了一个文件夹load_timestamp=2020-04-27 03:21:54.
我希望将文件夹命名为 table_name=2020-04-27 03:21:54.
这可能吗?
enriched = df.withColumn("load_timestamp", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
enriched.write.partitionBy("load_timestamp").format("parquet").mode("append").save("s3://s3-enriched-bucket/" + job_statement[0])
默认情况下,Spark 根据分区列创建 directories
,即
<partition_column_name>=<value>
Easiest way
修复方法是将列名保留为 table_name
并在 partition by 子句中使用。
enriched = df.withColumn("table_name", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
enriched.write.partitionBy("table_name").format("parquet").mode("append").save("s3://s3-enriched-bucket/" + job_statement[0])
Other way would be:
通过使用 迭代重命名目录并将 load_timestamp
更改为 table_name
。
我正在为胶水上作业 运行 的时间戳添加一列。我想使用 partitionBy(load_timestamp)
保存它。例如创建了一个文件夹load_timestamp=2020-04-27 03:21:54.
我希望将文件夹命名为 table_name=2020-04-27 03:21:54.
这可能吗?
enriched = df.withColumn("load_timestamp", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
enriched.write.partitionBy("load_timestamp").format("parquet").mode("append").save("s3://s3-enriched-bucket/" + job_statement[0])
默认情况下,Spark 根据分区列创建 directories
,即
<partition_column_name>=<value>
Easiest way
修复方法是将列名保留为 table_name
并在 partition by 子句中使用。
enriched = df.withColumn("table_name", unix_timestamp(lit(timestamp),'yyyy-MM-dd HH:mm:ss').cast("timestamp"))
enriched.write.partitionBy("table_name").format("parquet").mode("append").save("s3://s3-enriched-bucket/" + job_statement[0])
Other way would be:
通过使用 load_timestamp
更改为 table_name
。