缩减阶段 Mapreduce java 堆 space 错误
Mapreduce java heap space error during reduce phase
我有一个简单的 mapreduce 作业来构建 tfidf 索引,但当 reducer 大约为 java heap space 错误时,我总是以 java 结束。 70%。我尝试了不同的方法,使用各种结构,告诉我的工作在命令中使用更多的内存,运行 我的工作在较小的样本上,但没有任何改变甚至很小。我的想法已经结束,所以我将不胜感激任何关于正在发生的事情的提示。
Mapper 产生正确的输出,但 reducer 总是由于 java 堆 space 错误而失败。
这是我的命令运行(我正在尝试指定使用的内存量):hadoop jar WordCountMPv1.jar -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --input /user/myslima3/wiki2 --output /user/myslima3/index
我的整个 mapreduce 代码:
public class Indexer extends Configured implements Tool {
/*
* Vocabulary: key = term, value = index
*/
private static Map<String, Integer> vocab = new HashMap<String, Integer>();
private static Map<String, Double> mapIDF = new HashMap<String, Double>();
private static final int DOC_COUNT = 751300; // total number of documents
public static void main(String[] arguments) throws Exception {
System.exit(ToolRunner.run(new Indexer(), arguments));
}
public static class Comparator extends WritableComparator {
protected Comparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -a.compareTo(b);
}
}
public static class IndexerMapper extends
Mapper<Object, Text, IntWritable, Text> {
private Text result = new Text();
// load vocab from distributed cache
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
Path getPath = new Path(cacheFiles[0].getPath());
BufferedReader bf = new BufferedReader(new InputStreamReader(
fs.open(getPath)));
String line = null;
while ((line = bf.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line, " \t");
int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id
String word = st.nextToken(); // second element is the term
double IDF = Integer.parseInt(st.nextToken()); // third token is the DF
// compute IDF
IDF = (Math.log(DOC_COUNT / IDF));
mapIDF.put(word, IDF);
// save vocab
vocab.put(word, index);
}
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// init TF map
Map<String, Integer> mapTF = new HashMap<String, Integer>();
// parse input string
StringTokenizer st = new StringTokenizer(value.toString(), " \t");
// first element is doc index
int index = Integer.parseInt(st.nextToken());
//sb.append(index + "\t");
// count term frequencies
String word;
while (st.hasMoreTokens()) {
word = st.nextToken();
// check if word is in the vocabulary
if (vocab.containsKey(word)) {
if (mapTF.containsKey(word)) {
int count = mapTF.get(word);
mapTF.put(word, count + 1);
} else {
mapTF.put(word, 1);
}
}
}
// compute TF-IDF
double idf;
double tfidf;
int wordIndex;
for (String term : mapTF.keySet()) {
int tf = mapTF.get(term);
if (mapIDF.containsKey(term)) {
idf = mapIDF.get(term);
tfidf = tf * idf;
wordIndex = vocab.get(term);
context.write(new IntWritable(wordIndex), new Text(index + ":" + tfidf));
}
}
}
}
public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text>
{
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// reset vocab and maps to reduce memory
vocab = null;
mapIDF = null;
StringBuilder sb = new StringBuilder();
for (Text value : values)
{
sb.append(value.toString() + " ");
}
context.write(key, new Text(sb.toString()));
}
}
@Override
public int run(String[] arguments) throws Exception {
ArgumentParser parser = new ArgumentParser("TextPreprocessor");
parser.addArgument("input", true, true, "specify input directory");
parser.addArgument("output", true, true, "specify output directory");
parser.parseAndCheck(arguments);
Path inputPath = new Path(parser.getString("input"));
Path outputDir = new Path(parser.getString("output"));
// Create configuration.
Configuration conf = getConf();
// add distributed file with vocabulary
DistributedCache
.addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);
// Create job.
Job job = new Job(conf, "WordCount");
job.setJarByClass(IndexerMapper.class);
// Setup MapReduce.
job.setMapperClass(IndexerMapper.class);
job.setReducerClass(IndexerReducer.class);
// Sort the output words in reversed order.
job.setSortComparatorClass(Comparator.class);
job.setNumReduceTasks(1);
// Specify (key, value).
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input.
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output.
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem hdfs = FileSystem.get(conf);
// Delete output directory (if exists).
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute the job.
return job.waitForCompletion(true) ? 0 : 1;
}
}
感谢您的帮助!
编辑:堆栈跟踪
15/04/06 10:54:38 INFO mapreduce.Job: map 0% reduce 0%
15/04/06 10:54:52 INFO mapreduce.Job: map 25% reduce 0%
15/04/06 10:54:54 INFO mapreduce.Job: map 31% reduce 0%
15/04/06 10:54:55 INFO mapreduce.Job: map 50% reduce 0%
15/04/06 10:54:56 INFO mapreduce.Job: map 55% reduce 0%
15/04/06 10:54:58 INFO mapreduce.Job: map 58% reduce 0%
15/04/06 10:55:00 INFO mapreduce.Job: map 63% reduce 0%
15/04/06 10:55:07 INFO mapreduce.Job: map 69% reduce 0%
15/04/06 10:55:08 INFO mapreduce.Job: map 82% reduce 0%
15/04/06 10:55:10 INFO mapreduce.Job: map 88% reduce 0%
15/04/06 10:55:11 INFO mapreduce.Job: map 96% reduce 0%
15/04/06 10:55:12 INFO mapreduce.Job: map 100% reduce 0%
15/04/06 10:55:25 INFO mapreduce.Job: map 100% reduce 29%
15/04/06 10:55:31 INFO mapreduce.Job: map 100% reduce 36%
15/04/06 10:55:34 INFO mapreduce.Job: map 100% reduce 48%
15/04/06 10:55:37 INFO mapreduce.Job: map 100% reduce 61%
15/04/06 10:55:40 INFO mapreduce.Job: map 100% reduce 68%
15/04/06 10:55:43 INFO mapreduce.Job: map 100% reduce 71%
15/04/06 10:55:44 INFO mapreduce.Job: Task Id : attempt_1427101801879_0658_r_000000_0, Status : FAILED
Error: Java heap space
仔细查看附加在减速器中的 StringBuffer。您没有指定初始大小(我认为)默认为 16。随着它的增长,它需要将自身复制到越来越大的缓冲区中,因此您最终得到长度为 16、32、48、64 的缓冲区,... (不确定增长量,但你明白了)。无论如何,大量值传递到 reducer 会导致使用大量内存,而垃圾收集可以处理大部分内存,直到 StringBuffer 变得太大以至于无法增长。换句话说,这并不能很好地扩展。
然而,鉴于这是您选择的算法,我只能建议您尝试提供一个非常大的初始大小,看看您是否能幸运地强制增长恰好适合可用内存。
如果做不到这一点,您也许可以创建一个特殊的 OutputFormat,它能够在编写值时连接它们并在键更改时创建一个新行,但我一直没有想到这一点。
解决了我指定更多减速器并实施组合器的问题。
我有一个简单的 mapreduce 作业来构建 tfidf 索引,但当 reducer 大约为 java heap space 错误时,我总是以 java 结束。 70%。我尝试了不同的方法,使用各种结构,告诉我的工作在命令中使用更多的内存,运行 我的工作在较小的样本上,但没有任何改变甚至很小。我的想法已经结束,所以我将不胜感激任何关于正在发生的事情的提示。
Mapper 产生正确的输出,但 reducer 总是由于 java 堆 space 错误而失败。
这是我的命令运行(我正在尝试指定使用的内存量):hadoop jar WordCountMPv1.jar -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --input /user/myslima3/wiki2 --output /user/myslima3/index
我的整个 mapreduce 代码:
public class Indexer extends Configured implements Tool {
/*
* Vocabulary: key = term, value = index
*/
private static Map<String, Integer> vocab = new HashMap<String, Integer>();
private static Map<String, Double> mapIDF = new HashMap<String, Double>();
private static final int DOC_COUNT = 751300; // total number of documents
public static void main(String[] arguments) throws Exception {
System.exit(ToolRunner.run(new Indexer(), arguments));
}
public static class Comparator extends WritableComparator {
protected Comparator() {
super(Text.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
return -a.compareTo(b);
}
}
public static class IndexerMapper extends
Mapper<Object, Text, IntWritable, Text> {
private Text result = new Text();
// load vocab from distributed cache
public void setup(Context context) throws IOException {
Configuration conf = context.getConfiguration();
FileSystem fs = FileSystem.get(conf);
URI[] cacheFiles = DistributedCache.getCacheFiles(conf);
Path getPath = new Path(cacheFiles[0].getPath());
BufferedReader bf = new BufferedReader(new InputStreamReader(
fs.open(getPath)));
String line = null;
while ((line = bf.readLine()) != null) {
StringTokenizer st = new StringTokenizer(line, " \t");
int index = Integer.parseInt(st.nextToken()); // first token is the line number - term id
String word = st.nextToken(); // second element is the term
double IDF = Integer.parseInt(st.nextToken()); // third token is the DF
// compute IDF
IDF = (Math.log(DOC_COUNT / IDF));
mapIDF.put(word, IDF);
// save vocab
vocab.put(word, index);
}
}
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
// init TF map
Map<String, Integer> mapTF = new HashMap<String, Integer>();
// parse input string
StringTokenizer st = new StringTokenizer(value.toString(), " \t");
// first element is doc index
int index = Integer.parseInt(st.nextToken());
//sb.append(index + "\t");
// count term frequencies
String word;
while (st.hasMoreTokens()) {
word = st.nextToken();
// check if word is in the vocabulary
if (vocab.containsKey(word)) {
if (mapTF.containsKey(word)) {
int count = mapTF.get(word);
mapTF.put(word, count + 1);
} else {
mapTF.put(word, 1);
}
}
}
// compute TF-IDF
double idf;
double tfidf;
int wordIndex;
for (String term : mapTF.keySet()) {
int tf = mapTF.get(term);
if (mapIDF.containsKey(term)) {
idf = mapIDF.get(term);
tfidf = tf * idf;
wordIndex = vocab.get(term);
context.write(new IntWritable(wordIndex), new Text(index + ":" + tfidf));
}
}
}
}
public static class IndexerReducer extends Reducer<IntWritable, Text, IntWritable, Text>
{
@Override
public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
// reset vocab and maps to reduce memory
vocab = null;
mapIDF = null;
StringBuilder sb = new StringBuilder();
for (Text value : values)
{
sb.append(value.toString() + " ");
}
context.write(key, new Text(sb.toString()));
}
}
@Override
public int run(String[] arguments) throws Exception {
ArgumentParser parser = new ArgumentParser("TextPreprocessor");
parser.addArgument("input", true, true, "specify input directory");
parser.addArgument("output", true, true, "specify output directory");
parser.parseAndCheck(arguments);
Path inputPath = new Path(parser.getString("input"));
Path outputDir = new Path(parser.getString("output"));
// Create configuration.
Configuration conf = getConf();
// add distributed file with vocabulary
DistributedCache
.addCacheFile(new URI("/user/myslima3/vocab.txt"), conf);
// Create job.
Job job = new Job(conf, "WordCount");
job.setJarByClass(IndexerMapper.class);
// Setup MapReduce.
job.setMapperClass(IndexerMapper.class);
job.setReducerClass(IndexerReducer.class);
// Sort the output words in reversed order.
job.setSortComparatorClass(Comparator.class);
job.setNumReduceTasks(1);
// Specify (key, value).
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(Text.class);
// Input.
FileInputFormat.addInputPath(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
// Output.
FileOutputFormat.setOutputPath(job, outputDir);
job.setOutputFormatClass(TextOutputFormat.class);
FileSystem hdfs = FileSystem.get(conf);
// Delete output directory (if exists).
if (hdfs.exists(outputDir))
hdfs.delete(outputDir, true);
// Execute the job.
return job.waitForCompletion(true) ? 0 : 1;
}
}
感谢您的帮助!
编辑:堆栈跟踪
15/04/06 10:54:38 INFO mapreduce.Job: map 0% reduce 0%
15/04/06 10:54:52 INFO mapreduce.Job: map 25% reduce 0%
15/04/06 10:54:54 INFO mapreduce.Job: map 31% reduce 0%
15/04/06 10:54:55 INFO mapreduce.Job: map 50% reduce 0%
15/04/06 10:54:56 INFO mapreduce.Job: map 55% reduce 0%
15/04/06 10:54:58 INFO mapreduce.Job: map 58% reduce 0%
15/04/06 10:55:00 INFO mapreduce.Job: map 63% reduce 0%
15/04/06 10:55:07 INFO mapreduce.Job: map 69% reduce 0%
15/04/06 10:55:08 INFO mapreduce.Job: map 82% reduce 0%
15/04/06 10:55:10 INFO mapreduce.Job: map 88% reduce 0%
15/04/06 10:55:11 INFO mapreduce.Job: map 96% reduce 0%
15/04/06 10:55:12 INFO mapreduce.Job: map 100% reduce 0%
15/04/06 10:55:25 INFO mapreduce.Job: map 100% reduce 29%
15/04/06 10:55:31 INFO mapreduce.Job: map 100% reduce 36%
15/04/06 10:55:34 INFO mapreduce.Job: map 100% reduce 48%
15/04/06 10:55:37 INFO mapreduce.Job: map 100% reduce 61%
15/04/06 10:55:40 INFO mapreduce.Job: map 100% reduce 68%
15/04/06 10:55:43 INFO mapreduce.Job: map 100% reduce 71%
15/04/06 10:55:44 INFO mapreduce.Job: Task Id : attempt_1427101801879_0658_r_000000_0, Status : FAILED
Error: Java heap space
仔细查看附加在减速器中的 StringBuffer。您没有指定初始大小(我认为)默认为 16。随着它的增长,它需要将自身复制到越来越大的缓冲区中,因此您最终得到长度为 16、32、48、64 的缓冲区,... (不确定增长量,但你明白了)。无论如何,大量值传递到 reducer 会导致使用大量内存,而垃圾收集可以处理大部分内存,直到 StringBuffer 变得太大以至于无法增长。换句话说,这并不能很好地扩展。
然而,鉴于这是您选择的算法,我只能建议您尝试提供一个非常大的初始大小,看看您是否能幸运地强制增长恰好适合可用内存。
如果做不到这一点,您也许可以创建一个特殊的 OutputFormat,它能够在编写值时连接它们并在键更改时创建一个新行,但我一直没有想到这一点。
解决了我指定更多减速器并实施组合器的问题。