小文件的火花重新分区数据
spark repartition data for small file
我是 Spark
的新手,我使用集群主要是为了并行化。我有一个 100MB 的文件,其中的每一行都经过某种算法处理,这是一个相当繁重且漫长的处理过程。
我想使用 10 节点集群并并行处理。我知道块大小超过 100MB
,我尝试重新分区 textFile
。如果我理解的好,这个repartition
方法增加分区数:
JavaRDD<String> input = sc.textFile(args[0]);
input.repartition(10);
问题是当我部署到集群时,只有一个节点在有效处理。我怎样才能设法并行处理文件?
更新 1: 这是我的 spark-submit
命令:
/usr/bin/spark-submit --master yarn --class mypackage.myclass --jars
myjar.jar
gs://mybucket/input.txt outfile
更新二: 分区后,基本有2个操作:
JavaPairRDD<String, String> int_input = mappingToPair(input);
JavaPairRDD<String, String> output = mappingValues(int_input, option);
output.saveAsTextFile("hdfs://...");
其中 mappingToPair(...)
是
public JavaPairRDD<String, String> mappingToPair(JavaRDD<String> input){
return input.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String line) {
String[] arrayList = line.split("\t", 2);
return new Tuple2(arrayList[0], arrayList[1]);
}
});
}
和mappingValues(...)
是以下类型的方法:
public JavaPairRDD<String,String> mappingValues(JavaPairRDD<String,String> rdd, final String option){
return rdd.mapValues(
new Function<String, String>() {
// here the algo processing takes place...
}
)
}
这里可能存在多个问题:
- 文件只有一个块大。使用多个执行程序读取它根本没有用,因为 HDFS 节点可以全速服务一个节点,或者以一半速度(加上开销)服务两个节点,等等。当您执行时,执行程序计数变得有用(对于读取步骤)有多个块分散在不同的 HDFS 节点上。
- 您也可能以不可拆分的压缩格式存储文件,因此输入步骤只能用一个执行器读取它,即使它是块大小的 100 倍。
- 您没有将
repartition(10)
调用链接到您的流程中,因此它根本无效。如果将这一行:input.repartition(10);
替换为这一行:input = input.repartition(10);
它将被使用,并且在继续下一步之前应该将 RDD 拆分为多个。
请注意,重新分区会使您的过程变得更长,因为数据必须拆分并传输到其他机器,这很容易因网络速度慢而成为瓶颈。
当您使用客户端部署模式时尤其如此。这意味着第一个执行者(驱动程序)是您提交的本地 Spark 实例。所以它会先从集群中下载所有的数据到driver,分区之后再上传回其他YARN节点。
我可以继续讲这个,但我想说的主要是:如果你的算法非常简单,而不是分区、传输,这个过程甚至可能 运行 在一个执行器上更快,然后 运行 在所有执行器上并行执行算法。
我是 Spark
的新手,我使用集群主要是为了并行化。我有一个 100MB 的文件,其中的每一行都经过某种算法处理,这是一个相当繁重且漫长的处理过程。
我想使用 10 节点集群并并行处理。我知道块大小超过 100MB
,我尝试重新分区 textFile
。如果我理解的好,这个repartition
方法增加分区数:
JavaRDD<String> input = sc.textFile(args[0]);
input.repartition(10);
问题是当我部署到集群时,只有一个节点在有效处理。我怎样才能设法并行处理文件?
更新 1: 这是我的 spark-submit
命令:
/usr/bin/spark-submit --master yarn --class mypackage.myclass --jars
myjar.jar
gs://mybucket/input.txt outfile
更新二: 分区后,基本有2个操作:
JavaPairRDD<String, String> int_input = mappingToPair(input);
JavaPairRDD<String, String> output = mappingValues(int_input, option);
output.saveAsTextFile("hdfs://...");
其中 mappingToPair(...)
是
public JavaPairRDD<String, String> mappingToPair(JavaRDD<String> input){
return input.mapToPair(new PairFunction<String, String, String>() {
public Tuple2<String, String> call(String line) {
String[] arrayList = line.split("\t", 2);
return new Tuple2(arrayList[0], arrayList[1]);
}
});
}
和mappingValues(...)
是以下类型的方法:
public JavaPairRDD<String,String> mappingValues(JavaPairRDD<String,String> rdd, final String option){
return rdd.mapValues(
new Function<String, String>() {
// here the algo processing takes place...
}
)
}
这里可能存在多个问题:
- 文件只有一个块大。使用多个执行程序读取它根本没有用,因为 HDFS 节点可以全速服务一个节点,或者以一半速度(加上开销)服务两个节点,等等。当您执行时,执行程序计数变得有用(对于读取步骤)有多个块分散在不同的 HDFS 节点上。
- 您也可能以不可拆分的压缩格式存储文件,因此输入步骤只能用一个执行器读取它,即使它是块大小的 100 倍。
- 您没有将
repartition(10)
调用链接到您的流程中,因此它根本无效。如果将这一行:input.repartition(10);
替换为这一行:input = input.repartition(10);
它将被使用,并且在继续下一步之前应该将 RDD 拆分为多个。
请注意,重新分区会使您的过程变得更长,因为数据必须拆分并传输到其他机器,这很容易因网络速度慢而成为瓶颈。
当您使用客户端部署模式时尤其如此。这意味着第一个执行者(驱动程序)是您提交的本地 Spark 实例。所以它会先从集群中下载所有的数据到driver,分区之后再上传回其他YARN节点。
我可以继续讲这个,但我想说的主要是:如果你的算法非常简单,而不是分区、传输,这个过程甚至可能 运行 在一个执行器上更快,然后 运行 在所有执行器上并行执行算法。