hadoop mapreduce生成不同长度的子串

hadoop mapreduce to generate substrings of different lengths

我正在使用 Hadoop mapreduce 编写代码来获取不同长度的子字符串。给定字符串 "ZYXCBA" 和长度 3 的示例(使用文本文件,我将输入作为“3 ZYXCBA”)。我的代码必须 return 所有可能的长度为 3 的字符串("ZYX"、"YXC"、"XCB"、"CBA")、长度为 4("ZYXC"、 "YXCB","XCBA") 最后长度为 5("ZYXCB","YXCBA").

在地图阶段我做了以下事情:

key = 我想要的子字符串的长度

值 = "ZYXCBA".

所以映射器输出是

3,"ZYXCBA"
4,"ZYXCBA"
5,"ZYXCBA"

在 reduce 中,我采用字符串 ("ZYXCBA") 和键 3 来获取长度为 3 的所有子字符串。4,5 也是如此。结果使用字符串连接。所以减少的输出应该是:

3 "ZYX YXC XCB CBA"
4 "ZYXC YXCB XCBA"
5 "ZYXCB YXCBA" 

我是 运行 我的代码使用以下命令:

hduser@Ganesh:~/Documents$ hadoop jar Saishingles.jar hadoopshingles.Saishingles Behara/Shingles/input Behara/Shingles/output

我的代码如下图:

package hadoopshingles;

import java.io.IOException;
//import java.util.ArrayList;

import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;


public class Saishingles{

public static class shinglesmapper extends Mapper<Object, Text, IntWritable, Text>{

        public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {

            String str = new String(value.toString());
            String[] list = str.split(" ");
            int x = Integer.parseInt(list[0]);
            String val = list[1];
            int M = val.length();
            int X = M-1;


            for(int z = x; z <= X; z++)
            {
                context.write(new IntWritable(z), new Text(val));
            }

        }

     }


public static class shinglesreducer extends Reducer<IntWritable,Text,IntWritable,Text> {


    public void reduce(IntWritable key, Text value, Context context
            ) throws IOException, InterruptedException {
        int z = key.get();
        String str = new String(value.toString());
        int M = str.length();
        int Tz = M - z;
        String newvalue = "";
        for(int position = 0; position <= Tz; position++)
        {
            newvalue = newvalue + " " + str.substring(position,position + z);   
        }

        context.write(new IntWritable(z),new Text(newvalue));
    }
}




public static void main(String[] args) throws Exception {
      GenericOptionsParser parser = new GenericOptionsParser(args);
      Configuration conf = parser.getConfiguration();
      String[] otherArgs = parser.getRemainingArgs();

        if (otherArgs.length != 2) 
        {
          System.err.println("Usage: Saishingles <inputFile> <outputDir>");
          System.exit(2);
        }
      Job job = Job.getInstance(conf, "Saishingles");
      job.setJarByClass(hadoopshingles.Saishingles.class);
      job.setMapperClass(shinglesmapper.class);
      //job.setCombinerClass(shinglesreducer.class);
      job.setReducerClass(shinglesreducer.class);
      //job.setMapOutputKeyClass(IntWritable.class);
      //job.setMapOutputValueClass(Text.class);
      job.setOutputKeyClass(IntWritable.class);
      job.setOutputValueClass(Text.class);
      FileInputFormat.addInputPath(job, new Path(args[0]));
      FileOutputFormat.setOutputPath(job, new Path(args[1]));
      System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

reduce 的输出而不是 returning

3 "ZYX YXC XCB CBA"
4 "ZYXC YXCB XCBA"
5 "ZYXCB YXCBA" 

return正在

3 "ZYXCBA"
4 "ZYXCBA"
5 "ZYXCBA"

即,它提供与映射器相同的输出。不知道为什么会这样。请帮我解决这个问题,在此先感谢您的帮助 ;) :) :)

你甚至可以在没有 运行 reducer 的情况下实现这一点。你的 map/reduce 逻辑是错误的...转换应该在 Mapper 中完成。

Reduce - 在此阶段,每个 <key, (list of values)> 调用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 方法在分组输入中配​​对。

在你的 reduce 签名中:public void reduce(IntWritable key, Text value, Context context)

应该是public void reduce(IntWritable key, Iterable<Text> values, Context context)

此外,将 reduce 方法的最后一行:context.write(new IntWritable(z),new Text(newvalue)); 更改为 context.write(key,new Text(newvalue)); - 您已经从 mapper 获得了 Intwritable Key,我不会创建 new

给定输入:

3 "ZYXCBA"
4 "ZYXCBA"
5 "ZYXCBA"

Mapper 作业将输出:

3   "XCB YXC ZYX"
4   "XCBA YXCB ZYXC"
5   "YXCBA ZYXCB"

MapReduceJob:

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class SubStrings{

    public static class SubStringsMapper extends Mapper<Object, Text, IntWritable, Text> {

        @Override
        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            String [] values = value.toString().split(" ");
            int len = Integer.parseInt(values[0].trim());
            String str = values[1].replaceAll("\"", "").trim();

            int endindex=len;
            for(int i = 0; i < len; i++)
            {
                endindex=i+len;
                if(endindex <= str.length())
                    context.write(new IntWritable(len), new Text(str.substring(i, endindex))); 
            }

        }   
    }

    public  static class SubStringsReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

        public void reduce(IntWritable key, Iterable<Text> values, Context context) 
                throws IOException, InterruptedException {

            String str="\""; //adding starting quotes
            for(Text value: values)
                str += " " + value;

            str=str.replace("\" ", "\"") + "\""; //adding ending quotes
            context.write(key, new Text(str));
        }
    }

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "get-possible-strings-by-length");

        job.setJarByClass(SubStrings.class);
        job.setMapperClass(SubStringsMapper.class); 
        job.setReducerClass(SubStringsReducer.class);

        job.setMapOutputKeyClass(IntWritable.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        FileSystem fs = null;
        Path dstFilePath = new Path(args[1]);
        try {
            fs = dstFilePath.getFileSystem(conf);
            if (fs.exists(dstFilePath))
                fs.delete(dstFilePath, true);
        } catch (IOException e1) {
            e1.printStackTrace();
        }

        job.waitForCompletion(true);
    } 
}