如何有效地将 pyspark 数据帧上传为压缩的 csv 或 parquet 文件(类似 to.gz 格式)

How to EFFICIENTLY upload a a pyspark dataframe as a zipped csv or parquet file(similiar to.gz format)

我在 S3 中有 130 GB csv.gz 文件,该文件是使用从 redshift 到 S3 的并行卸载加载的。因为它包含多个文件,所以我想减少文件的数量,以便更容易阅读我的 ML 模型(使用 sklearn)。

我已经成功地将多个从 S3 转换为 spark 数据帧(称为 spark_df),方法是: spark_df1=spark.read.csv(path,header=False,schema=schema)

spark_df1 包含 100 列(特征),是我对数百万客户 ID 的时间序列推理数据。由于它是时间序列数据,我想确保 'customerID' 的数据点应该出现在同一个输出文件中,因为我将每个分区文件作为一个块读取。 我想将此数据卸载回 S3.I 不介意较小的数据分区,但每个分区文件应该包含单个客户的整个时间序列数据。换句话说,一个客户的数据不能在 2 个文件中。

当前代码: datasink3=spark_df1.repartition(1).write.format("parquet").save(destination_path)

然而,这需要很长时间 运行 并且输出是单个文件,甚至没有压缩。我也尝试使用“.coalesce(1)”而不是“.repartition(1)”,但在我的情况下速度较慢。

您可以使用 customerID 对其进行分区:

spark_df1.partitionBy("customerID") \
         .write.format("parquet") \
         .save(destination_path)

您可以在此处阅读更多相关信息:https://sparkbyexamples.com/pyspark/pyspark-repartition-vs-partitionby/

此代码有效,运行 的时间减少到原始结果的 1/5。唯一需要注意的是,确保负载在节点之间平均分配(在我的例子中,我必须确保每个客户 ID 具有相同的行数)

spark_df1.repartition("customerID").write.partitionBy("customerID").format("csv").option("compression","gzip").save(destination_path)

添加到 manks ,您需要按 customerID 重新分区 DataFrame,然后 write.partitionBy(customerID) 为每个客户获取一个文件。 你可以看到类似的问题 .

此外,关于您关于 parquet 文件未压缩的评论,默认压缩是 snappy,与 gzip 压缩相比有一些优点和缺点,但它仍然比未压缩的好得多。