将文件从一个镶木地板分区移动到另一个
Moving files from one parquet partition to another
我的 S3 存储桶中有大量数据,分为两列 MODULE
和 DATE
这样我的镶木地板的文件结构是:
s3://my_bucket/path/file.parquet/MODULE='XYZ'/DATE=2020-01-01
我有 7 个 MODULE
,DATE
的范围从 2020-01-01
到 2020-09-01
。
我发现数据存在差异,需要更正其中一个模块的 MODULE
条目。基本上我需要将属于 MODULE
XYZ 的特定索引号的所有数据更改为 MODULE
ABC。
我可以通过加载数据框并执行以下操作在 pyspark 中执行此操作:
df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
但是我如何重新分区以便只有那些更改的条目才会移动到 ABC MODULE
分区?如果我做类似的事情:
df.mode('append').partitionBy('MODULE','DATE').parquet(s3://my_bucket/path/file.parquet")
我会将数据与错误的 MODULE
数据一起添加。另外,我有将近一年的数据,不想重新分区整个数据集,因为这会花费很长时间。
有办法吗?
IIUC 您可以通过过滤该特定索引的数据然后将该数据与日期一起保存为分区来做到这一点。
df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
df = df.filter(col('index')==34)
df.mode('overwrite').partitionBy('DATE').parquet(s3://my_bucket/path/ABC/")
通过这种方式,您最终只会修改更改的模块,即 ABC
如果我理解得很好,分区 MODULE=XYZ
中有数据应该移动到 MODULE=ABC
。
首先,确定受影响的文件。
from pyspark.sql import functions as F
file_list = df.where(F.col("index") == 34).select(
F.input_file_name()
).distinct().collect()
然后,你创建一个仅基于这些文件的数据框,你用它来完成这两个 MODULE
。
df = spark.read.parquet(file_list).withColumn(
"MODULE", when(col("index") == 34, "ABC").otherwise(col("MODULE"))
)
df.write.parquet(
"s3://my_bucket/path/ABC/", mode="append", partitionBy=["MODULE", "DATE"]
)
此时ABC应该可以了(你刚刚补上了缺失的数据),但是XYZ应该是错的,因为有重复的数据。要恢复 XYZ,您只需删除 file_list
.
中的文件列表
我的 S3 存储桶中有大量数据,分为两列 MODULE
和 DATE
这样我的镶木地板的文件结构是:
s3://my_bucket/path/file.parquet/MODULE='XYZ'/DATE=2020-01-01
我有 7 个 MODULE
,DATE
的范围从 2020-01-01
到 2020-09-01
。
我发现数据存在差异,需要更正其中一个模块的 MODULE
条目。基本上我需要将属于 MODULE
XYZ 的特定索引号的所有数据更改为 MODULE
ABC。
我可以通过加载数据框并执行以下操作在 pyspark 中执行此操作:
df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
但是我如何重新分区以便只有那些更改的条目才会移动到 ABC MODULE
分区?如果我做类似的事情:
df.mode('append').partitionBy('MODULE','DATE').parquet(s3://my_bucket/path/file.parquet")
我会将数据与错误的 MODULE
数据一起添加。另外,我有将近一年的数据,不想重新分区整个数据集,因为这会花费很长时间。
有办法吗?
IIUC 您可以通过过滤该特定索引的数据然后将该数据与日期一起保存为分区来做到这一点。
df=df.withColumn('MODULE', when(col('index')==34, "ABC").otherwise(col('MODULE')))
df = df.filter(col('index')==34)
df.mode('overwrite').partitionBy('DATE').parquet(s3://my_bucket/path/ABC/")
通过这种方式,您最终只会修改更改的模块,即 ABC
如果我理解得很好,分区 MODULE=XYZ
中有数据应该移动到 MODULE=ABC
。
首先,确定受影响的文件。
from pyspark.sql import functions as F
file_list = df.where(F.col("index") == 34).select(
F.input_file_name()
).distinct().collect()
然后,你创建一个仅基于这些文件的数据框,你用它来完成这两个 MODULE
。
df = spark.read.parquet(file_list).withColumn(
"MODULE", when(col("index") == 34, "ABC").otherwise(col("MODULE"))
)
df.write.parquet(
"s3://my_bucket/path/ABC/", mode="append", partitionBy=["MODULE", "DATE"]
)
此时ABC应该可以了(你刚刚补上了缺失的数据),但是XYZ应该是错的,因为有重复的数据。要恢复 XYZ,您只需删除 file_list
.