如何找到 HDFS 中文件之间的对称差异?

How to find symmetric difference between files in HDFS?

我在 hdfs 中有 2 个文件:/my/path/in/hdfs/part-r-*(大约 1000 个部分,每个约 10000 行)和 /my/another/path/in/hdfs/part-r-*(相同大小)。第一个文件包含以下形式的数据:

id1 111
id6 212
id3 984

等等。 第二个是这样的:

999 id8
15 id4
93 id1

我想找到第一个文件中没有出现在第二个文件中的所有 ID,反之亦然。有什么简单的方法吗?

我必须承认,我怀疑这种计算是否适合 MapReduce 的严格范例,纯粹基于该过程的复杂性和计算量(即使您说您的案例中的两个输入文件大小相同),所以我认为这是一个很好的案例,可以在保持简单的同时找到偷工减料的方法。

首先,为了摆脱额外的 IO 负担,您可能希望将这两个文件放入一个目录中(为了简单起见,我们在这里说 \input)以绕过多个输入混乱。之后,只需要一个 MapReduce 作业就可以轻松得多。

在 Map 阶段,您需要做的就是将两个文件的 ID 设置为键,并将它们出现的“文件名”设置为它们的值(这是一种找到对称差异的安全方法,同时继续概括地说,一个 ID 可能会在一个文件中多次出现)。这些“文件名”实际上不需要是实际的文件名,您可以只放置 AB 的字符串来指示在第一个或第二个文件中分别找到该特定行中的特定 ID .

在 Reduce 阶段,您可以将所有引用单个 key/ID 的值放入 HashSet 集合中,该集合包含所有 unique 值你投入其中。这意味着对于每个 reducer(也就是每个 ID),都会创建一个 HashSet 来放置 AB 字符串的多个实例,只存储 one 这些实例。所以:

  • 仅在第一个文件中出现的 ID 将有一个 HashSet 集合,其中只有 A
  • 只在第二个文件中看到的 ID 将有一个 HashSet 集合,其中只有 B
  • 在两个文件中看到的 ID 将有一个 HashSet 集合,其中包含 AB(又名 交集不需要的文件数量)。

有了它,您可以简单地检查每个 ID 的 HashSet 并只写上面描述的前两个选项的 ID。

这种类型的工作可以看起来像这样(这里的 Reduce 函数实际上不需要在键值对中有值,所以我只是放一个空的 String 让事情更简单):

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.*;
import java.io.IOException;
import java.util.*;
import java.nio.charset.StandardCharsets;

public class SymDiff 
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <ID, file>
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text> 
    {
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            String[] line = value.toString().split(" ");    // split each line to two columns
            
            // if the first column consists of integers, put the ID from the 2nd column as the key
            // and set "B" as the value to imply that the particular ID was found on the second file
            // else, put the ID from the first column as the key
            // and set "A" as the value to imply that the particular ID was found on the first file
            if(line[0].matches("\d+"))     // (lazy way to see if the first string is an int without throwing an exception)
                context.write(new Text(line[1]), new Text("B"));
            else
                context.write(new Text(line[0]), new Text("A"));
        }
    }

    /* input: <ID, file>
     * output: <ID, "">
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException 
        {
            HashSet<String> list_of_files = new HashSet<String>();

            // store all the instances of "A" and "B" for each ID in a HashSet with unique values
            for(Text value : values)
                list_of_files.add(value.toString());
            
            // only write the IDs which they values only contain "A" or "B" (and not both) on their set 
            if(list_of_files.contains("A") && !list_of_files.contains("B") || (!list_of_files.contains("A") && list_of_files.contains("B")))
                context.write(key, new Text(""));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("output");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job job = Job.getInstance(conf, "Symmetric Difference");
        job.setJarByClass(SymDiff.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);    
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input_dir);
        FileOutputFormat.setOutputPath(job, output_dir);
        job.waitForCompletion(true);
    }
}

您可以在下面的屏幕截图中看到所需的输出: