在 PySpark 中将多个目录中的 CSV 文件转换为镶木地板
Convert CSV files from multiple directory into parquet in PySpark
我有来自多个路径的 CSV 文件,这些路径不是 s3 存储桶中的父目录。所有 table 都具有相同的分区键。
s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...
我需要将这些 csv 文件转换为 parquet 文件并将它们存储在另一个具有相同目录结构的 s3 存储桶中。
另一个s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...
我有一个解决方案是遍历 s3 存储桶并找到 CSV 文件并将其转换为镶木地板并保存到另一个 S3 路径。我发现这种方式效率不高,因为我有一个循环,一个文件一个文件地进行转换。
想利用spark库来提高效率。
然后,我尝试了:
spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')
这种方式对每个 table 都有效,但为了进一步优化它,我想将 table_name 作为参数,例如:
TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')
谢谢
提到的question provides solutions for reading multiple files at once. The method spark.read.csv(...)
accepts one or multiple paths as shown here。对于读取文件,您可以应用相同的逻辑。虽然,在写入时,Spark 会将所有给定的 dataset/paths 合并到一个 Dataframe 中。因此,如果不先应用自定义逻辑,就不可能从一个数据帧生成多个数据帧。所以总而言之,没有这种方法可以将初始数据帧直接提取到多个目录中,即 df.write.csv(*TABLE_NAMES)
。
好消息是Spark提供了一个专门的函数input_file_name(),它return是当前记录的文件路径。您可以将它与 TABLE_NAMES 结合使用以过滤 table 名称。
这是一种可能的未经测试的 PySpark 解决方案:
from pyspark.sql.functions import input_file_name
TABLE_NAMES = [table_name_1, table_name_2, ...]
source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]
all_df = spark.read.csv(*input_paths) \
.withColumn("file_name", input_file_name()) \
.cache()
dest_path = "s3n://another_bucket/"
def write_table(table_name: string) -> None:
all_df.where(all_df["file_name"].contains(table_name))
.write
.partitionBy('partition_key_1','partition_key_2')
.parquet(f"{dest_path}/{table_name}")
for t in TABLE_NAMES:
write_table(t)
解释:
我们生成输入路径并将其存储到 input_paths
。这将创建如下路径:s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN
.
然后我们将所有路径加载到一个数据框中,在其中添加一个名为 file_name
的新列,这将保存每一行的路径。请注意,我们在这里还使用了 cache
,这很重要,因为我们在以下代码中有多个 len(TABLE_NAMES)
操作。使用缓存将阻止我们一次又一次地加载数据源。
接下来我们创建 write_table
,它负责保存给定 table 的数据。下一步是使用 all_df["file_name"].contains(table_name)
基于 table 名称进行过滤,这将 return 仅包含 file_name
中包含 table_name
值的记录柱子。最后我们像您一样保存过滤后的数据。
在最后一步中,我们为 TABLE_NAMES
的每个项目调用 write_table
。
相关链接
我有来自多个路径的 CSV 文件,这些路径不是 s3 存储桶中的父目录。所有 table 都具有相同的分区键。
s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.csv
...
我需要将这些 csv 文件转换为 parquet 文件并将它们存储在另一个具有相同目录结构的 s3 存储桶中。
另一个s3的目录:
table_name_1/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
table_name_2/partition_key_1 = <pk_1>/partition_key_2 = <pk_2>/file.parquet
...
我有一个解决方案是遍历 s3 存储桶并找到 CSV 文件并将其转换为镶木地板并保存到另一个 S3 路径。我发现这种方式效率不高,因为我有一个循环,一个文件一个文件地进行转换。
想利用spark库来提高效率。 然后,我尝试了:
spark.read.csv('s3n://bucket_name/table_name_1/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/table_name_1')
这种方式对每个 table 都有效,但为了进一步优化它,我想将 table_name 作为参数,例如:
TABLE_NAMES = [table_name_1, table_name_2, ...]
spark.read.csv('s3n://bucket_name/{*TABLE_NAMES}/').write.partitionBy('partition_key_1', 'partition_key_2').parquet('s3n://another_bucket/{*TABLE_NAMES}')
谢谢
提到的question provides solutions for reading multiple files at once. The method spark.read.csv(...)
accepts one or multiple paths as shown here。对于读取文件,您可以应用相同的逻辑。虽然,在写入时,Spark 会将所有给定的 dataset/paths 合并到一个 Dataframe 中。因此,如果不先应用自定义逻辑,就不可能从一个数据帧生成多个数据帧。所以总而言之,没有这种方法可以将初始数据帧直接提取到多个目录中,即 df.write.csv(*TABLE_NAMES)
。
好消息是Spark提供了一个专门的函数input_file_name(),它return是当前记录的文件路径。您可以将它与 TABLE_NAMES 结合使用以过滤 table 名称。
这是一种可能的未经测试的 PySpark 解决方案:
from pyspark.sql.functions import input_file_name
TABLE_NAMES = [table_name_1, table_name_2, ...]
source_path = "s3n://bucket_name/"
input_paths = [f"{source_path}/{t}" for t in TABLE_NAMES]
all_df = spark.read.csv(*input_paths) \
.withColumn("file_name", input_file_name()) \
.cache()
dest_path = "s3n://another_bucket/"
def write_table(table_name: string) -> None:
all_df.where(all_df["file_name"].contains(table_name))
.write
.partitionBy('partition_key_1','partition_key_2')
.parquet(f"{dest_path}/{table_name}")
for t in TABLE_NAMES:
write_table(t)
解释:
我们生成输入路径并将其存储到
input_paths
。这将创建如下路径:s3n://bucket_name/table1, s3n://bucket_name/table2 ... s3n://bucket_name/tableN
.然后我们将所有路径加载到一个数据框中,在其中添加一个名为
file_name
的新列,这将保存每一行的路径。请注意,我们在这里还使用了cache
,这很重要,因为我们在以下代码中有多个len(TABLE_NAMES)
操作。使用缓存将阻止我们一次又一次地加载数据源。接下来我们创建
write_table
,它负责保存给定 table 的数据。下一步是使用all_df["file_name"].contains(table_name)
基于 table 名称进行过滤,这将 return 仅包含file_name
中包含table_name
值的记录柱子。最后我们像您一样保存过滤后的数据。在最后一步中,我们为
TABLE_NAMES
的每个项目调用write_table
。
相关链接