小文件的火花重新分区数据

spark repartition data for small file

我是 Spark 的新手,我使用集群主要是为了并行化。我有一个 100MB 的文件,其中的每一行都经过某种算法处理,这是一个相当繁重且漫长的处理过程。

我想使用 10 节点集群并并行处理。我知道块大小超过 100MB,我尝试重新分区 textFile。如果我理解的好,这个repartition方法增加分区数:

JavaRDD<String> input = sc.textFile(args[0]);
input.repartition(10);

问题是当我部署到集群时,只有一个节点在有效处理。我怎样才能设法并行处理文件?

更新 1: 这是我的 spark-submit 命令:

/usr/bin/spark-submit --master yarn --class mypackage.myclass --jars 
myjar.jar 
gs://mybucket/input.txt outfile

更新二: 分区后,基本有2个操作:

JavaPairRDD<String, String> int_input = mappingToPair(input);
JavaPairRDD<String, String> output = mappingValues(int_input, option);
output.saveAsTextFile("hdfs://...");

其中 mappingToPair(...)

public JavaPairRDD<String, String> mappingToPair(JavaRDD<String> input){
        return input.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String line) {
                String[] arrayList = line.split("\t", 2);
                return new Tuple2(arrayList[0], arrayList[1]);
            }
        });
    }

mappingValues(...)是以下类型的方法:

public JavaPairRDD<String,String> mappingValues(JavaPairRDD<String,String> rdd, final String option){
        return rdd.mapValues(
                new Function<String, String>() {
                    // here the algo processing takes place...
                }
        )
}

这里可能存在多个问题:

  1. 文件只有一个块大。使用多个执行程序读取它根本没有用,因为 HDFS 节点可以全速服务一个节点,或者以一半速度(加上开销)服务两个节点,等等。当您执行时,执行程序计数变得有用(对于读取步骤)有多个块分散在不同的 HDFS 节点上。
  2. 您也可能以不可拆分的压缩格式存储文件,因此输入步骤只能用一个执行器读取它,即使它是块大小的 100 倍。
  3. 您没有将 repartition(10) 调用链接到您的流程中,因此它根本无效。如果将这一行:input.repartition(10); 替换为这一行:input = input.repartition(10); 它将被使用,并且在继续下一步之前应该将 RDD 拆分为多个。

请注意,重新分区会使您的过程变得更长,因为数据必须拆分并传输到其他机器,这很容易因网络速度慢而成为瓶颈。

当您使用客户端部署模式时尤其如此。这意味着第一个执行者(驱动程序)是您提交的本地 Spark 实例。所以它会先从集群中下载所有的数据到driver,分区之后再上传回其他YARN节点。

我可以继续讲这个,但我想说的主要是:如果你的算法非常简单,而不是分区、传输,这个过程甚至可能 运行 在一个执行器上更快,然后 运行 在所有执行器上并行执行算法。