使用汉明距离算法在 map reduce 的帮助下为模糊连接创建索引

using hamming distance algorithm to create index for fuzzy join with help of map reduce

我是 Java 和 Map Reduce 的新手,我正在尝试编写一个 Map Reduce 程序,读取程序中名为 "dictionary" 的列表词,并使用汉明距离算法生成所有列表中距离为 1 的单词。我能够生成输出,但问题是它似乎效率很低,因为我需要将整个列表加载到 ArrayList 中,并且对于每个单词,我在 Map 方法中调用汉明距离,所以我正在阅读整个列出两次 运行 汉明距离算法 n*n 次,其中 n 是列表中的单词数。

请给我一些有效的方法。

这是代码。目前还没有reducer。

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.StringTokenizer;

import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

public class MapJoin {


    public static class MyMapper extends Mapper<LongWritable,Text, Text, Text> {


        private List<String> Lst = new ArrayList<String>();


        protected void setup(Context context) throws java.io.IOException, InterruptedException{
            Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());


            for (Path p : files) {
                if (p.getName().equals("dictionary.txt")) {
                    BufferedReader reader = new BufferedReader(new FileReader(p.toString()));
                    String line = reader.readLine();
                    while(line != null) {
                        String tokens = line.toString() ;

                        Lst.add(tokens);
                        line = reader.readLine();
                    }
                    reader.close();
                }
            }
            if (Lst.isEmpty()) {
                throw new IOException("Unable to load Abbrevation data.");
            }
        }


        public void map(LongWritable key, Text val, Context con)
                throws IOException,InterruptedException {

              String line1 = val.toString();
              StringTokenizer itr = new StringTokenizer(line1.toLowerCase()) ;
              while (itr.hasMoreTokens()) {

                  String key1 = itr.nextToken() ;  
                  String fnlstr = HammingDist(key1) ;

                      con.write(new Text(key1), new Text(fnlstr));

              }
            }


        private String HammingDist(String ky)
          {
              String result = "" ;
              for(String x :Lst)
              {
                  char[] s1 = ky.toCharArray();
                    char[] s2 = x.toCharArray();

                    int shorter = Math.min(s1.length, s2.length);
                    int longer = Math.max(s1.length, s2.length);

                    int distance = 0;
                    for (int i=0; i<shorter; i++) {
                        if (s1[i] != s2[i]) distance++;
                    }

                    distance += longer - shorter;

                    if (distance <2)
                    {
                        result = result +","+x ;
                    }
              }
              if(result == null) 
                  {
                  return "" ;
                  }
              else
              return result ;
          }
    }

  public static void main(String[] args) 
                  throws IOException, ClassNotFoundException, InterruptedException {

    Job job = new Job();
    job.setJarByClass(MapJoin.class);
    job.setJobName("MapJoin");
    job.setNumReduceTasks(0);

    try{
    DistributedCache.addCacheFile(new URI("/Input/dictionary.txt"), job.getConfiguration());
    }catch(Exception e){
        System.out.println(e);
    }

    job.setMapperClass(MyMapper.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);


  }
}

对于您在 map 中找到的每个标记, 您调用 HammingDist,它将遍历 List<String> Lst 并将每个项目转换为 char[]。 最好把List<String> Lst换成List<char[]>, 并添加首先转换的元素, 而不是一遍又一遍地转换相同的词。