为什么 Spark 应用程序将 DataFrame 保存到具有多个 csv 文件的 S3 存储桶

Why Spark application saves a DataFrame to S3 bucket with multi csv files

大家好,我是 Spark 和 Amazon EMR 集群的新手。

我尝试编写一个演示 spark 应用程序,可以 运行 在 Amazon EMR 集群上运行。 当 Zeppelin 笔记本上的代码 运行s 时,它 returns 输出,我认为输出将作为单个文件保存在 Amazon EMR 集群上,如下所示:

%pyspark
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
    df_new = df.withColumn('upper_c', upper(df.c))
df_new

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+

Spark 应用程序:

from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper
from datetime import datetime, date
import argparse

def pre_processing(output_uri):

    spark =  SparkSession.builder.appName("process sample data").getOrCreate()
    rdd = spark.sparkContext.parallelize([
        (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
        (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
        (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
    ])
    df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
    spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
    if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
        df_new = df.withColumn('upper_c', upper(df.c))
    df_new
    df_new.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument('--output_uri', help="The URI where output is saved")
    args = parser.parse_args()
    pre_processing(args.output_uri)

但是,当我 运行 它在集群上的 Spark 应用程序时,它会将多个 CSV 文件保存到 S3 存储桶。 我想知道为什么我的 Spark 应用程序将 DataFrame 保存到 S3 存储桶上的多个文件。

Spark 应用程序的参数如下:

spark-submit --deploy-mode cluster s3://<BUCKET_NAME>/spark_application/emr_demo_app.py --output_uri s3://<BUCKET_NAME>/output

提前致谢。

ps:一旦我遵循 AWS EMR 教程 page,并将示例应用程序保存为单个 CSV 文件。

Spark 使用分区的概念来并行化工作人员之间的任务。数据帧也被分区,当调用保存操作时,每个工作人员将保存一部分数据帧,创建多个文件。

为了创建单个文件,只需 repartitioncoalesce 将数据帧放入一个分区中:

df_new.repartition(1).write.option("header", "true").mode("overwrite").csv(output_uri)

所有数据都发送给工作人员,然后工作人员将记录保存到一个文件中。如果数据集太大,就会遇到瓶颈问题。 在这里可以找到类似的答案:Write single CSV file using spark-csv