Hadoop mapreduce 2文件过滤?

Hadoop mapreduce 2 files filtering?

我需要打印出那些没有订单号的客户的“姓名”。我知道我必须使用映射器方法来实例化变量。我也必须使用 2 个映射器,因为有 2 个输入文件。在reduce阶段,我必须过滤掉没有order.no的客户。但是,如何过滤掉那些没有订单号的客户?

File1.txt

 Cust.No. Name

 1        Adam
 2        Abe
 3        Alex
 4        Jones

File2.txt

    Order.Num.    Cust.No.     Price
    01            1            5422
    02            1            23
    03            2            1265
    04            3            127

我做了什么

最初在 reducer 方法中,我循环键并检查它是否与现有键匹配:

if (!(Data[0].equals("key")))
    {
        System.out.println(Data[1]);
    }

但是,它会打印每一行。

似乎是一个常规的 reduce 侧连接,所以它可能在某种程度上是一个普通的用例,但是这些类型的计算往往在工作负载方面变得非常残酷。这意味着我们必须找到偷工减料的方法,以确保应用程序能够很好地扩展以适应更大规模的输入。

为应用程序的执行节省 time/space 的最常见方法是尝试以一种我们可以在保留所有功能的同时“削减”一个或多个作业的方式来设计可能的多个 MR 作业,或尝试最小化将在输入数据上实现的(自定义)映射器的数量。两者中的后者对于您要实现的这种过滤非常常见,因为我们可以轻松地只使用一个 Map 函数,它的每个实例都会检查当前正在读取的文件的名称以采取相应的行动。

更具体地说,我们可以通过Map class的setup函数获取映射器开始运行之前的File1.txtFile2.txt文件名], 并使用当前要读取的文件的名称来确定如何将文件中的数据切碎并存储为键值对。对于你的问题,这个Map函数会输出两种键值对:

  • <customer_ID, customer_name>(对于 File1.txt 中的数据)

  • <customer_ID, order_ID>(对于 File2.txt 中的数据)

然后 Reduce 函数的实例将为每个客户运行(当然,因为客户 ID 和名称是唯一的)并访问分组值,这些值只不过是 Text 包含此客户名称或订单 ID 的对象。我们只想输出没有任何订单记录的客户,所以我们要做的就是检查这个值列表的长度是否为 1(也就是这个客户是否没有其他对值用他的名字)。

为了展示这一点,我将两个输入文件都放在 HDFS 的目录 /input 中(我对 File1.txt 中的列使用了两个制表符分隔符,对列使用了三个制表符分隔符File2.txt。如果您的文件在列之间有不同的制表符或空格,您可以相应地更改它们):

File1.txt

Cust.No Name
1       Adam
2       Abe
3       Alex
4       Jones

File2.txt

Order.Num.  Cust.No.    Price
01          1           5422
02          1           23
03          2           1265
04          3           127

执行过滤的程序可能如下所示:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class OrderListFilter
{
    /* input:  <byte_offset, line_of_dataset>
     * output: <customer_ID, customer_name> OR <customer_ID, order_ID>
     */
    public static class Map extends Mapper<LongWritable, Text, Text, Text>
    {
        private String current_filename = "";

        protected void setup(Context context)
        {
            // get the name of the current to-be-read file
            InputSplit split = context.getInputSplit();
            Path path = ((FileSplit) split).getPath();
            current_filename = path.getName();
        }

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
        {
            if(current_filename.equals("File1.txt"))    // if mapper is reading through the customer's file
            {
                if(value.toString().contains("Cust.No"))    // remove header
                    return;
                else
                {
                    String[] columns = value.toString().split("\t\t");  // 2 tabs as delimiter

                    // write customer ID as key and name as value
                    context.write(new Text(columns[0]), new Text(columns[1]));
                }
            }
            else if(current_filename.equals("File2.txt"))   // if mapper is reading through the order's file
            {
                if(value.toString().contains("Cust.No"))    // remove header
                    return;
                else
                {
                    String[] columns = value.toString().split("\t\t\t"); // 3 tabs as delimiter

                    // write customer ID as key and order num as value
                    context.write(new Text(columns[1]), new Text(columns[0]));
                }
            }
        }
    }

    /* input: <customer_ID, customer_name> OR <customer_ID, order_ID>
     * output: <customer_ID, customer_name>
     */
    public static class Reduce extends Reducer<Text, Text, Text, Text>
    {
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
        {
            List<String> customer_records = new ArrayList<String>();

            // put all the values in a list to find the size of them
            for(Text value : values)
                customer_records.add(value.toString());

            // if there's only one record, i.e. just the ID and the customer's name in they key-value pairs,
            // write their ID and name to output
            if(customer_records.size() == 1)
                context.write(key, new Text(customer_records.get(0)));
        }
    }


    public static void main(String[] args) throws Exception
    {
        // set the paths of the input and output directories in the HDFS
        Path input_dir = new Path("input");
        Path output_dir = new Path("output");

        // in case the output directory already exists, delete it
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        if(fs.exists(output_dir))
            fs.delete(output_dir, true);

        // configure the MapReduce job
        Job job = Job.getInstance(conf, "Order List Filter");
        job.setJarByClass(OrderListFilter.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, input_dir);
        FileOutputFormat.setOutputPath(job, output_dir);
        job.waitForCompletion(true);
    }
}

它的输出似乎还不错(忽略我设置中的警告):