为什么 Spark 应用程序将 DataFrame 保存到具有多个 csv 文件的 S3 存储桶
Why Spark application saves a DataFrame to S3 bucket with multi csv files
大家好,我是 Spark 和 Amazon EMR 集群的新手。
我尝试编写一个演示 spark 应用程序,可以 运行 在 Amazon EMR 集群上运行。
当 Zeppelin 笔记本上的代码 运行s 时,它 returns 输出,我认为输出将作为单个文件保存在 Amazon EMR 集群上,如下所示:
%pyspark
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
df_new = df.withColumn('upper_c', upper(df.c))
df_new
+---+---+-------+----------+-------------------+-------+
| a| b| c| d| e|upper_c|
+---+---+-------+----------+-------------------+-------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
Spark 应用程序:
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper
from datetime import datetime, date
import argparse
def pre_processing(output_uri):
spark = SparkSession.builder.appName("process sample data").getOrCreate()
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
df_new = df.withColumn('upper_c', upper(df.c))
df_new
df_new.write.option("header", "true").mode("overwrite").csv(output_uri)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--output_uri', help="The URI where output is saved")
args = parser.parse_args()
pre_processing(args.output_uri)
但是,当我 运行 它在集群上的 Spark 应用程序时,它会将多个 CSV 文件保存到 S3 存储桶。
我想知道为什么我的 Spark 应用程序将 DataFrame 保存到 S3 存储桶上的多个文件。
Spark 应用程序的参数如下:
spark-submit --deploy-mode cluster s3://<BUCKET_NAME>/spark_application/emr_demo_app.py --output_uri s3://<BUCKET_NAME>/output
提前致谢。
ps:一旦我遵循 AWS EMR 教程 page,并将示例应用程序保存为单个 CSV 文件。
Spark 使用分区的概念来并行化工作人员之间的任务。数据帧也被分区,当调用保存操作时,每个工作人员将保存一部分数据帧,创建多个文件。
为了创建单个文件,只需 repartition
或 coalesce
将数据帧放入一个分区中:
df_new.repartition(1).write.option("header", "true").mode("overwrite").csv(output_uri)
所有数据都发送给工作人员,然后工作人员将记录保存到一个文件中。如果数据集太大,就会遇到瓶颈问题。
在这里可以找到类似的答案:Write single CSV file using spark-csv
大家好,我是 Spark 和 Amazon EMR 集群的新手。
我尝试编写一个演示 spark 应用程序,可以 运行 在 Amazon EMR 集群上运行。 当 Zeppelin 笔记本上的代码 运行s 时,它 returns 输出,我认为输出将作为单个文件保存在 Amazon EMR 集群上,如下所示:
%pyspark
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
df_new = df.withColumn('upper_c', upper(df.c))
df_new
+---+---+-------+----------+-------------------+-------+
| a| b| c| d| e|upper_c|
+---+---+-------+----------+-------------------+-------+
| 1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
| 2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
| 3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+
Spark 应用程序:
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql.functions import upper
from datetime import datetime, date
import argparse
def pre_processing(output_uri):
spark = SparkSession.builder.appName("process sample data").getOrCreate()
rdd = spark.sparkContext.parallelize([
(1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
(2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
(3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
if type(df.c) == type(upper(df.c)) == type(df.c.isNull()):
df_new = df.withColumn('upper_c', upper(df.c))
df_new
df_new.write.option("header", "true").mode("overwrite").csv(output_uri)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument('--output_uri', help="The URI where output is saved")
args = parser.parse_args()
pre_processing(args.output_uri)
但是,当我 运行 它在集群上的 Spark 应用程序时,它会将多个 CSV 文件保存到 S3 存储桶。 我想知道为什么我的 Spark 应用程序将 DataFrame 保存到 S3 存储桶上的多个文件。
Spark 应用程序的参数如下:
spark-submit --deploy-mode cluster s3://<BUCKET_NAME>/spark_application/emr_demo_app.py --output_uri s3://<BUCKET_NAME>/output
提前致谢。
ps:一旦我遵循 AWS EMR 教程 page,并将示例应用程序保存为单个 CSV 文件。
Spark 使用分区的概念来并行化工作人员之间的任务。数据帧也被分区,当调用保存操作时,每个工作人员将保存一部分数据帧,创建多个文件。
为了创建单个文件,只需 repartition
或 coalesce
将数据帧放入一个分区中:
df_new.repartition(1).write.option("header", "true").mode("overwrite").csv(output_uri)
所有数据都发送给工作人员,然后工作人员将记录保存到一个文件中。如果数据集太大,就会遇到瓶颈问题。 在这里可以找到类似的答案:Write single CSV file using spark-csv