如何有效地将 pyspark 数据帧上传为压缩的 csv 或 parquet 文件(类似 to.gz 格式)
How to EFFICIENTLY upload a a pyspark dataframe as a zipped csv or parquet file(similiar to.gz format)
我在 S3 中有 130 GB csv.gz 文件,该文件是使用从 redshift 到 S3 的并行卸载加载的。因为它包含多个文件,所以我想减少文件的数量,以便更容易阅读我的 ML 模型(使用 sklearn)。
我已经成功地将多个从 S3 转换为 spark 数据帧(称为 spark_df),方法是:
spark_df1=spark.read.csv(path,header=False,schema=schema)
spark_df1 包含 100 列(特征),是我对数百万客户 ID 的时间序列推理数据。由于它是时间序列数据,我想确保 'customerID' 的数据点应该出现在同一个输出文件中,因为我将每个分区文件作为一个块读取。
我想将此数据卸载回 S3.I 不介意较小的数据分区,但每个分区文件应该包含单个客户的整个时间序列数据。换句话说,一个客户的数据不能在 2 个文件中。
当前代码:
datasink3=spark_df1.repartition(1).write.format("parquet").save(destination_path)
然而,这需要很长时间 运行 并且输出是单个文件,甚至没有压缩。我也尝试使用“.coalesce(1)”而不是“.repartition(1)”,但在我的情况下速度较慢。
您可以使用 customerID 对其进行分区:
spark_df1.partitionBy("customerID") \
.write.format("parquet") \
.save(destination_path)
您可以在此处阅读更多相关信息:https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/
此代码有效,运行 的时间减少到原始结果的 1/5。唯一需要注意的是,确保负载在节点之间平均分配(在我的例子中,我必须确保每个客户 ID 具有相同的行数)
spark_df1.repartition("customerID").write.partitionBy("customerID").format("csv").option("compression","gzip").save(destination_path)
添加到 manks ,您需要按 customerID 重新分区 DataFrame,然后 write.partitionBy(customerID) 为每个客户获取一个文件。
你可以看到类似的问题 .
此外,关于您关于 parquet 文件未压缩的评论,默认压缩是 snappy,与 gzip 压缩相比有一些优点和缺点,但它仍然比未压缩的好得多。
我在 S3 中有 130 GB csv.gz 文件,该文件是使用从 redshift 到 S3 的并行卸载加载的。因为它包含多个文件,所以我想减少文件的数量,以便更容易阅读我的 ML 模型(使用 sklearn)。
我已经成功地将多个从 S3 转换为 spark 数据帧(称为 spark_df),方法是:
spark_df1=spark.read.csv(path,header=False,schema=schema)
spark_df1 包含 100 列(特征),是我对数百万客户 ID 的时间序列推理数据。由于它是时间序列数据,我想确保 'customerID' 的数据点应该出现在同一个输出文件中,因为我将每个分区文件作为一个块读取。 我想将此数据卸载回 S3.I 不介意较小的数据分区,但每个分区文件应该包含单个客户的整个时间序列数据。换句话说,一个客户的数据不能在 2 个文件中。
当前代码:
datasink3=spark_df1.repartition(1).write.format("parquet").save(destination_path)
然而,这需要很长时间 运行 并且输出是单个文件,甚至没有压缩。我也尝试使用“.coalesce(1)”而不是“.repartition(1)”,但在我的情况下速度较慢。
您可以使用 customerID 对其进行分区:
spark_df1.partitionBy("customerID") \
.write.format("parquet") \
.save(destination_path)
您可以在此处阅读更多相关信息:https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/
此代码有效,运行 的时间减少到原始结果的 1/5。唯一需要注意的是,确保负载在节点之间平均分配(在我的例子中,我必须确保每个客户 ID 具有相同的行数)
spark_df1.repartition("customerID").write.partitionBy("customerID").format("csv").option("compression","gzip").save(destination_path)
添加到 manks
此外,关于您关于 parquet 文件未压缩的评论,默认压缩是 snappy,与 gzip 压缩相比有一些优点和缺点,但它仍然比未压缩的好得多。