PySpark 输出文件数
PySpark Number of Output Files
我是 Spark 新手。我有一个简单的 pyspark 脚本。它读取一个 json 文件,将其展平并将其作为 parquet 压缩文件写入 S3 位置。
读取和转换步骤 运行 非常快并使用 50 个执行程序(我在 conf 中设置)。但是写入阶段耗时较长,只写入一个大文件(480MB)。
保存文件的数量是如何决定的?
可以以某种方式加快写入操作吗?
谢谢,
拉姆.
输出的文件数等于保存的RDD的分区数。在此示例中,RDD 被重新分区以控制输出文件的数量。
尝试:
repartition(numPartitions) - Reshuffle the data in the RDD randomly
to create either more or fewer partitions and balance it across them.
This always shuffles all data over the network.
>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test")
输出的文件数与RDD的分区数相同
$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test
Found 3 items
-rw-r--r-- 1 cloudera cloudera 0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000
-rw-r--r-- 1 cloudera cloudera 1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001
同时检查一下:coalesce(numPartitions)
更新:
The textFile method also takes an optional second argument for
controlling the number of partitions of the file. By default, Spark
creates one partition for each block of the file (blocks being 64MB by
default in HDFS), but you can also ask for a higher number of
partitions by passing a larger value. Note that you cannot have fewer
partitions than blocks.
...但这是可能的分区的最小数量,因此不能保证它们。
所以如果你想在读取时分区,你应该使用这个....
dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2)
有两件事需要考虑:-
HDFS 块大小:- HDFS 的块大小可在 HDFS 中配置-site.xml(默认为 128 Mb)。如果文件的大小大于块大小,则会在内存中为文件数据的其余部分分配一个新块。但是,那不是你能看到的。它是在内部完成的。整个过程是有顺序的。
分区:- 当 Spark 出现时,并行性也会出现。理想情况下,如果您不手动提供分区数,它将等于默认配置中的块大小。另一方面,如果你想自定义分区文件的数量,你可以继续使用 API ,其中 n 是分区的数量。
当您浏览 HDFS 时,这些分区对您可见。
此外,为了提高性能,您可以在 spark-submit / pyspark /spark-shell 时给出一些规范,例如 num executors、executor memory、per executor 的 cores 等。写入任何文件时的性能很大程度上取决于用于相同文件的格式和压缩编解码器。
感谢阅读。
我是 Spark 新手。我有一个简单的 pyspark 脚本。它读取一个 json 文件,将其展平并将其作为 parquet 压缩文件写入 S3 位置。
读取和转换步骤 运行 非常快并使用 50 个执行程序(我在 conf 中设置)。但是写入阶段耗时较长,只写入一个大文件(480MB)。
保存文件的数量是如何决定的? 可以以某种方式加快写入操作吗?
谢谢, 拉姆.
输出的文件数等于保存的RDD的分区数。在此示例中,RDD 被重新分区以控制输出文件的数量。
尝试:
repartition(numPartitions) - Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
>>> dataRDD.repartition(2).saveAsTextFile("/user/cloudera/sqoop_import/orders_test")
输出的文件数与RDD的分区数相同
$ hadoop fs -ls /user/cloudera/sqoop_import/orders_test
Found 3 items
-rw-r--r-- 1 cloudera cloudera 0 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 1499519 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00000
-rw-r--r-- 1 cloudera cloudera 1500425 2016-12-28 12:52 /user/cloudera/sqoop_import/orders_test/part-00001
同时检查一下:coalesce(numPartitions)
更新:
The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
...但这是可能的分区的最小数量,因此不能保证它们。
所以如果你想在读取时分区,你应该使用这个....
dataRDD=sc.textFile("/user/cloudera/sqoop_import/orders").repartition(2)
有两件事需要考虑:-
HDFS 块大小:- HDFS 的块大小可在 HDFS 中配置-site.xml(默认为 128 Mb)。如果文件的大小大于块大小,则会在内存中为文件数据的其余部分分配一个新块。但是,那不是你能看到的。它是在内部完成的。整个过程是有顺序的。
分区:- 当 Spark 出现时,并行性也会出现。理想情况下,如果您不手动提供分区数,它将等于默认配置中的块大小。另一方面,如果你想自定义分区文件的数量,你可以继续使用 API ,其中 n 是分区的数量。 当您浏览 HDFS 时,这些分区对您可见。
此外,为了提高性能,您可以在 spark-submit / pyspark /spark-shell 时给出一些规范,例如 num executors、executor memory、per executor 的 cores 等。写入任何文件时的性能很大程度上取决于用于相同文件的格式和压缩编解码器。
感谢阅读。