SparkContext.textFile 如何在幕后工作?
How does SparkContext.textFile work under the covers?
我试图深入理解 textFile
方法,但我认为我的
缺乏 Hadoop 知识让我退缩了。让我摆出我的
理解,也许你可以纠正任何不正确的地方
当调用sc.textFile(path)
时,则使用defaultMinPartitions
,
这实际上只是 math.min(taskScheduler.defaultParallelism, 2)
。让我们
假设我们使用的是 SparkDeploySchedulerBackend
而这是
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))
所以,现在假设默认值为 2,回到 textFile
,这是
传递给 HadoopRDD
。真实尺寸在 getPartitions()
中使用
inputFormat.getSplits(jobConf, minPartitions)
。但是,据我所知,
分区只是一个提示,实际上大部分都被忽略了,所以你会
大概得到块的总数。
好的,这符合预期,但是如果不使用默认值会怎样?
您提供的分区大小大于块大小。如果我的
研究是正确的,getSplits 调用只是忽略了这个参数,然后
提供的最小值不会被忽略,你仍然会得到
块大小?
短版:
拆分大小由mapred.min.split.size
或mapreduce.input.fileinputformat.split.minsize
决定,如果大于HDFS的blockSize,同一文件中的多个块将合并为一个拆分。
详细版本:
inputFormat.getSplits
.
之前的步骤我觉得你理解的很对
在 inputFormat.getSplits
内部,更具体地说,在 FileInputFormat's getSplits
内部,mapred.min.split.size
或 mapreduce.input.fileinputformat.split.minsize
最终将决定拆分大小。 (我不确定哪个在Spark中有效,我更愿意相信前者)。
我们来看代码:FileInputFormat from Hadoop 2.4.0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
在for循环内部,makeSplit()
用于生成每个split,splitSize
是有效的Split Size。 computeSplitSize 函数生成 splitSize
:
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
因此,如果minSplitSize > blockSize,则输出的split实际上是同一个HDFS文件中几个block的组合,反之,如果minSplitSize < blockSize,则每个split对应HDFS的一个block。
I will add more points with examples to Yijie Shen answer
Before we go into details,lets understand the following
Assume that we are working on Spark Standalone local system with 4 cores
In the application if master is configured as like below
new SparkConf().setMaster("**local[*]**") then
defaultParallelism : 4 (taskScheduler.defaultParallelism ie no.of cores)
/* Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
defaultMinPartitions : 2 //Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
查找 defaultMinPartitions 的逻辑如下
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
The actual partition size is defined by the following formula in the method FileInputFormat.computeSplitSize
package org.apache.hadoop.mapred;
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
}
where,
minSize is the hadoop parameter mapreduce.input.fileinputformat.split.minsize (default mapreduce.input.fileinputformat.split.minsize = 1 byte)
blockSize is the value of the dfs.block.size in cluster mode(**dfs.block.size - The default value in Hadoop 2.0 is 128 MB**) and fs.local.block.size in the local mode (**default fs.local.block.size = 32 MB ie blocksize = 33554432 bytes**)
goalSize = totalInputSize/numPartitions
where,
totalInputSize is the total size in bytes of all the files in the input path
numPartitions is the custom parameter provided to the method sc.textFile(inputPath, numPartitions) - if not provided it will be defaultMinPartitions ie 2 if master is set as local(*)
blocksize = file size in bytes = 33554432
33554432/1024 = 32768 KB
32768/1024 = 32 MB
Ex1:- If our file size is 91 bytes
minSize=1 (mapreduce.input.fileinputformat.split.minsize = 1 byte)
goalSize = totalInputSize/numPartitions
goalSize = 91(file size)/12(partitions provided as 2nd paramater in sc.textFile) = 7
splitSize = Math.max(minSize, Math.min(goalSize, blockSize)); => Math.max(1,Math.min(7,33554432)) = 7 // 33554432 is block size in local mode
Splits = 91(file size 91 bytes) / 7 (splitSize) => 13
FileInputFormat: Total # of splits generated by getSplits: 13
=> 在计算 splitSize 时,如果文件大小 > 32 MB,则分割大小将采用默认值 fs.local.block.size = 32 MB 即块大小 = 33554432 字节
我试图深入理解 textFile
方法,但我认为我的
缺乏 Hadoop 知识让我退缩了。让我摆出我的
理解,也许你可以纠正任何不正确的地方
当调用sc.textFile(path)
时,则使用defaultMinPartitions
,
这实际上只是 math.min(taskScheduler.defaultParallelism, 2)
。让我们
假设我们使用的是 SparkDeploySchedulerBackend
而这是
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(),
2))
所以,现在假设默认值为 2,回到 textFile
,这是
传递给 HadoopRDD
。真实尺寸在 getPartitions()
中使用
inputFormat.getSplits(jobConf, minPartitions)
。但是,据我所知,
分区只是一个提示,实际上大部分都被忽略了,所以你会
大概得到块的总数。
好的,这符合预期,但是如果不使用默认值会怎样? 您提供的分区大小大于块大小。如果我的 研究是正确的,getSplits 调用只是忽略了这个参数,然后 提供的最小值不会被忽略,你仍然会得到 块大小?
短版:
拆分大小由mapred.min.split.size
或mapreduce.input.fileinputformat.split.minsize
决定,如果大于HDFS的blockSize,同一文件中的多个块将合并为一个拆分。
详细版本:
inputFormat.getSplits
.
在 inputFormat.getSplits
内部,更具体地说,在 FileInputFormat's getSplits
内部,mapred.min.split.size
或 mapreduce.input.fileinputformat.split.minsize
最终将决定拆分大小。 (我不确定哪个在Spark中有效,我更愿意相信前者)。
我们来看代码:FileInputFormat from Hadoop 2.4.0
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
// generate splits
ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
NetworkTopology clusterMap = new NetworkTopology();
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs = path.getFileSystem(job);
BlockLocation[] blkLocations;
if (file instanceof LocatedFileStatus) {
blkLocations = ((LocatedFileStatus) file).getBlockLocations();
} else {
blkLocations = fs.getFileBlockLocations(file, 0, length);
}
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
String[] splitHosts = getSplitHosts(blkLocations,
length-bytesRemaining, splitSize, clusterMap);
splits.add(makeSplit(path, length-bytesRemaining, splitSize,
splitHosts));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
String[] splitHosts = getSplitHosts(blkLocations, length
- bytesRemaining, bytesRemaining, clusterMap);
splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
splitHosts));
}
} else {
String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
splits.add(makeSplit(path, 0, length, splitHosts));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
在for循环内部,makeSplit()
用于生成每个split,splitSize
是有效的Split Size。 computeSplitSize 函数生成 splitSize
:
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
因此,如果minSplitSize > blockSize,则输出的split实际上是同一个HDFS文件中几个block的组合,反之,如果minSplitSize < blockSize,则每个split对应HDFS的一个block。
I will add more points with examples to Yijie Shen answer
Before we go into details,lets understand the following
Assume that we are working on Spark Standalone local system with 4 cores
In the application if master is configured as like below
new SparkConf().setMaster("**local[*]**") then
defaultParallelism : 4 (taskScheduler.defaultParallelism ie no.of cores)
/* Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
defaultMinPartitions : 2 //Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
查找 defaultMinPartitions 的逻辑如下
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
The actual partition size is defined by the following formula in the method FileInputFormat.computeSplitSize
package org.apache.hadoop.mapred;
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
}
where,
minSize is the hadoop parameter mapreduce.input.fileinputformat.split.minsize (default mapreduce.input.fileinputformat.split.minsize = 1 byte)
blockSize is the value of the dfs.block.size in cluster mode(**dfs.block.size - The default value in Hadoop 2.0 is 128 MB**) and fs.local.block.size in the local mode (**default fs.local.block.size = 32 MB ie blocksize = 33554432 bytes**)
goalSize = totalInputSize/numPartitions
where,
totalInputSize is the total size in bytes of all the files in the input path
numPartitions is the custom parameter provided to the method sc.textFile(inputPath, numPartitions) - if not provided it will be defaultMinPartitions ie 2 if master is set as local(*)
blocksize = file size in bytes = 33554432
33554432/1024 = 32768 KB
32768/1024 = 32 MB
Ex1:- If our file size is 91 bytes
minSize=1 (mapreduce.input.fileinputformat.split.minsize = 1 byte)
goalSize = totalInputSize/numPartitions
goalSize = 91(file size)/12(partitions provided as 2nd paramater in sc.textFile) = 7
splitSize = Math.max(minSize, Math.min(goalSize, blockSize)); => Math.max(1,Math.min(7,33554432)) = 7 // 33554432 is block size in local mode
Splits = 91(file size 91 bytes) / 7 (splitSize) => 13
FileInputFormat: Total # of splits generated by getSplits: 13
=> 在计算 splitSize 时,如果文件大小 > 32 MB,则分割大小将采用默认值 fs.local.block.size = 32 MB 即块大小 = 33554432 字节