PySpark 将两个数据帧写入同一分区但按文件夹分隔
PySpark writing two dataframes to the same partition but separated by folder
我正在使用 Spark 将两个不同的数据帧写入同一个分区,但我希望它们在分区末尾用文件夹分隔。即第一个数据帧将写入 yyyy/mm/dd/
,第二个将写入 yyyy/mm/dd/rejected/
目前,我可以使用以下代码将第一个数据帧写入 yyyy/mm/dd/
并将第二个数据帧写入 rejected/yyyy/mm/dd
:
first_df.repartition('year', 'month', 'day').write \
.partitionBy('year', 'month', 'day') \
.mode("append") \
.csv(f"{output_path}/")
second_df.repartition('year', 'month', 'day').write \
.partitionBy('year', 'month', 'day') \
.mode("append") \
.csv(f"{output_path}/rejected")
感谢任何建议
将 rejected
作为文字值添加到 second_df
然后包含在 partitionBy
即
second_df.withColumn("rej",lit("rejected")) \
.repartition('year', 'month', 'day').write \
.partitionBy('year', 'month', 'day','rej') \
.mode("append") \
.csv(f"{output_path}")
其他方法是使用 将文件移动到相关目录中。
Update:
Rename the directory:
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://<name_node>:8020"), Configuration())
#rename the directory
fs.rename(Path(f'{output_path}/rej=rejected'),Path(f'{output_path}/rejected'))
我正在使用 Spark 将两个不同的数据帧写入同一个分区,但我希望它们在分区末尾用文件夹分隔。即第一个数据帧将写入 yyyy/mm/dd/
,第二个将写入 yyyy/mm/dd/rejected/
目前,我可以使用以下代码将第一个数据帧写入 yyyy/mm/dd/
并将第二个数据帧写入 rejected/yyyy/mm/dd
:
first_df.repartition('year', 'month', 'day').write \
.partitionBy('year', 'month', 'day') \
.mode("append") \
.csv(f"{output_path}/")
second_df.repartition('year', 'month', 'day').write \
.partitionBy('year', 'month', 'day') \
.mode("append") \
.csv(f"{output_path}/rejected")
感谢任何建议
将 rejected
作为文字值添加到 second_df
然后包含在 partitionBy
即
second_df.withColumn("rej",lit("rejected")) \
.repartition('year', 'month', 'day').write \
.partitionBy('year', 'month', 'day','rej') \
.mode("append") \
.csv(f"{output_path}")
其他方法是使用
Update:
Rename the directory:
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://<name_node>:8020"), Configuration())
#rename the directory
fs.rename(Path(f'{output_path}/rej=rejected'),Path(f'{output_path}/rejected'))