SparkContext.textFile 如何在幕后工作?

How does SparkContext.textFile work under the covers?

我试图深入理解 textFile 方法,但我认为我的 缺乏 Hadoop 知识让我退缩了。让我摆出我的 理解,也许你可以纠正任何不正确的地方

当调用sc.textFile(path)时,则使用defaultMinPartitions, 这实际上只是 math.min(taskScheduler.defaultParallelism, 2)。让我们 假设我们使用的是 SparkDeploySchedulerBackend 而这是

conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))

所以,现在假设默认值为 2,回到 textFile,这是 传递给 HadoopRDD。真实尺寸在 getPartitions() 中使用 inputFormat.getSplits(jobConf, minPartitions)。但是,据我所知, 分区只是一个提示,实际上大部分都被忽略了,所以你会 大概得到块的总数。

好的,这符合预期,但是如果不使用默认值会怎样? 您提供的分区大小大于块大小。如果我的 研究是正确的,getSplits 调用只是忽略了这个参数,然后 提供的最小值不会被忽略,你仍然会得到 块大小?

Cross posted with the spark mailing list

短版:

拆分大小由mapred.min.split.sizemapreduce.input.fileinputformat.split.minsize决定,如果大于HDFS的blockSize,同一文件中的多个块将合并为一个拆分。

详细版本:

inputFormat.getSplits.

之前的步骤我觉得你理解的很对

inputFormat.getSplits 内部,更具体地说,在 FileInputFormat's getSplits 内部,mapred.min.split.sizemapreduce.input.fileinputformat.split.minsize 最终将决定拆分大小。 (我不确定哪个在Spark中有效,我更愿意相信前者)。

我们来看代码:FileInputFormat from Hadoop 2.4.0

long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);

// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();

for (FileStatus file: files) {
  Path path = file.getPath();
  long length = file.getLen();
  if (length != 0) {
    FileSystem fs = path.getFileSystem(job);
    BlockLocation[] blkLocations;
    if (file instanceof LocatedFileStatus) {
      blkLocations = ((LocatedFileStatus) file).getBlockLocations();
    } else {
      blkLocations = fs.getFileBlockLocations(file, 0, length);
    }
    if (isSplitable(fs, path)) {
      long blockSize = file.getBlockSize();
      long splitSize = computeSplitSize(goalSize, minSize, blockSize);

      long bytesRemaining = length;
      while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
        String[] splitHosts = getSplitHosts(blkLocations,
            length-bytesRemaining, splitSize, clusterMap);
        splits.add(makeSplit(path, length-bytesRemaining, splitSize,
            splitHosts));
        bytesRemaining -= splitSize;
      }

      if (bytesRemaining != 0) {
        String[] splitHosts = getSplitHosts(blkLocations, length
            - bytesRemaining, bytesRemaining, clusterMap);
        splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
            splitHosts));
      }
    } else {
      String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
      splits.add(makeSplit(path, 0, length, splitHosts));
    }
  } else { 
    //Create empty hosts array for zero length files
    splits.add(makeSplit(path, 0, length, new String[0]));
  }
}

在for循环内部,makeSplit()用于生成每个split,splitSize是有效的Split Size。 computeSplitSize 函数生成 splitSize:

protected long computeSplitSize(long goalSize, long minSize,
                                   long blockSize) {
  return Math.max(minSize, Math.min(goalSize, blockSize));
}

因此,如果minSplitSize > blockSize,则输出的split实际上是同一个HDFS文件中几个block的组合,反之,如果minSplitSize < blockSize,则每个split对应HDFS的一个block。

I will add more points with examples to Yijie Shen answer

Before we go into details,lets understand the following

Assume that we are working on Spark Standalone local system with 4 cores

In the application if master is configured as like below
new SparkConf().setMaster("**local[*]**") then 

defaultParallelism : 4 (taskScheduler.defaultParallelism ie no.of cores)

/* Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ 

defaultMinPartitions : 2 //Default min number of partitions for Hadoop RDDs when not given by user

* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2. 

查找 defaultMinPartitions 的逻辑如下

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

The actual partition size is defined by the following formula in the method FileInputFormat.computeSplitSize

package org.apache.hadoop.mapred;
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
    protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
        return Math.max(minSize, Math.min(goalSize, blockSize));
    }
}

where,
    minSize is the hadoop parameter mapreduce.input.fileinputformat.split.minsize (default mapreduce.input.fileinputformat.split.minsize = 1 byte)
    blockSize is the value of the dfs.block.size in cluster mode(**dfs.block.size - The default value in Hadoop 2.0 is 128 MB**) and fs.local.block.size in the local mode (**default fs.local.block.size = 32 MB ie blocksize = 33554432 bytes**)
    goalSize = totalInputSize/numPartitions
        where,
            totalInputSize is the total size in bytes of all the files in the input path
            numPartitions is the custom parameter provided to the method sc.textFile(inputPath, numPartitions) - if not provided it will be defaultMinPartitions ie 2 if master is set as local(*)

blocksize = file size in bytes = 33554432
33554432/1024 = 32768 KB
32768/1024 = 32 MB


Ex1:- If our file size is 91 bytes
minSize=1 (mapreduce.input.fileinputformat.split.minsize = 1 byte)
goalSize = totalInputSize/numPartitions
goalSize = 91(file size)/12(partitions provided as 2nd paramater in sc.textFile) = 7 

splitSize = Math.max(minSize, Math.min(goalSize, blockSize)); => Math.max(1,Math.min(7,33554432)) = 7 // 33554432 is block size in local mode

Splits = 91(file size 91 bytes) / 7 (splitSize) => 13

FileInputFormat: Total # of splits generated by getSplits: 13

=> 在计算 splitSize 时,如果文件大小 > 32 MB,则分割大小将采用默认值 fs.local.block.size = 32 MB 即块大小 = 33554432 字节