Hadoop mapreduce 2文件过滤?
Hadoop mapreduce 2 files filtering?
我需要打印出那些没有订单号的客户的“姓名”。我知道我必须使用映射器方法来实例化变量。我也必须使用 2 个映射器,因为有 2 个输入文件。在reduce阶段,我必须过滤掉没有order.no的客户。但是,如何过滤掉那些没有订单号的客户?
File1.txt
Cust.No. Name
1 Adam
2 Abe
3 Alex
4 Jones
File2.txt
Order.Num. Cust.No. Price
01 1 5422
02 1 23
03 2 1265
04 3 127
我做了什么
最初在 reducer 方法中,我循环键并检查它是否与现有键匹配:
if (!(Data[0].equals("key")))
{
System.out.println(Data[1]);
}
但是,它会打印每一行。
似乎是一个常规的 reduce 侧连接,所以它可能在某种程度上是一个普通的用例,但是这些类型的计算往往在工作负载方面变得非常残酷。这意味着我们必须找到偷工减料的方法,以确保应用程序能够很好地扩展以适应更大规模的输入。
为应用程序的执行节省 time/space 的最常见方法是尝试以一种我们可以在保留所有功能的同时“削减”一个或多个作业的方式来设计可能的多个 MR 作业,或尝试最小化将在输入数据上实现的(自定义)映射器的数量。两者中的后者对于您要实现的这种过滤非常常见,因为我们可以轻松地只使用一个 Map 函数,它的每个实例都会检查当前正在读取的文件的名称以采取相应的行动。
更具体地说,我们可以通过Map class的setup
函数获取映射器开始运行之前的File1.txt
和File2.txt
文件名], 并使用当前要读取的文件的名称来确定如何将文件中的数据切碎并存储为键值对。对于你的问题,这个Map函数会输出两种键值对:
<customer_ID, customer_name>
(对于 File1.txt
中的数据)
<customer_ID, order_ID>
(对于 File2.txt
中的数据)
然后 Reduce 函数的实例将为每个客户运行(当然,因为客户 ID 和名称是唯一的)并访问分组值,这些值只不过是 Text
包含此客户名称或订单 ID 的对象。我们只想输出没有任何订单记录的客户,所以我们要做的就是检查这个值列表的长度是否为 1
(也就是这个客户是否没有其他对值用他的名字)。
为了展示这一点,我将两个输入文件都放在 HDFS 的目录 /input
中(我对 File1.txt
中的列使用了两个制表符分隔符,对列使用了三个制表符分隔符File2.txt
。如果您的文件在列之间有不同的制表符或空格,您可以相应地更改它们):
File1.txt
Cust.No Name
1 Adam
2 Abe
3 Alex
4 Jones
File2.txt
Order.Num. Cust.No. Price
01 1 5422
02 1 23
03 2 1265
04 3 127
执行过滤的程序可能如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class OrderListFilter
{
/* input: <byte_offset, line_of_dataset>
* output: <customer_ID, customer_name> OR <customer_ID, order_ID>
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
private String current_filename = "";
protected void setup(Context context)
{
// get the name of the current to-be-read file
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
current_filename = path.getName();
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if(current_filename.equals("File1.txt")) // if mapper is reading through the customer's file
{
if(value.toString().contains("Cust.No")) // remove header
return;
else
{
String[] columns = value.toString().split("\t\t"); // 2 tabs as delimiter
// write customer ID as key and name as value
context.write(new Text(columns[0]), new Text(columns[1]));
}
}
else if(current_filename.equals("File2.txt")) // if mapper is reading through the order's file
{
if(value.toString().contains("Cust.No")) // remove header
return;
else
{
String[] columns = value.toString().split("\t\t\t"); // 3 tabs as delimiter
// write customer ID as key and order num as value
context.write(new Text(columns[1]), new Text(columns[0]));
}
}
}
}
/* input: <customer_ID, customer_name> OR <customer_ID, order_ID>
* output: <customer_ID, customer_name>
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
List<String> customer_records = new ArrayList<String>();
// put all the values in a list to find the size of them
for(Text value : values)
customer_records.add(value.toString());
// if there's only one record, i.e. just the ID and the customer's name in they key-value pairs,
// write their ID and name to output
if(customer_records.size() == 1)
context.write(key, new Text(customer_records.get(0)));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("input");
Path output_dir = new Path("output");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job job = Job.getInstance(conf, "Order List Filter");
job.setJarByClass(OrderListFilter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input_dir);
FileOutputFormat.setOutputPath(job, output_dir);
job.waitForCompletion(true);
}
}
它的输出似乎还不错(忽略我设置中的警告):
我需要打印出那些没有订单号的客户的“姓名”。我知道我必须使用映射器方法来实例化变量。我也必须使用 2 个映射器,因为有 2 个输入文件。在reduce阶段,我必须过滤掉没有order.no的客户。但是,如何过滤掉那些没有订单号的客户?
File1.txt
Cust.No. Name
1 Adam
2 Abe
3 Alex
4 Jones
File2.txt
Order.Num. Cust.No. Price
01 1 5422
02 1 23
03 2 1265
04 3 127
我做了什么
最初在 reducer 方法中,我循环键并检查它是否与现有键匹配:
if (!(Data[0].equals("key")))
{
System.out.println(Data[1]);
}
但是,它会打印每一行。
似乎是一个常规的 reduce 侧连接,所以它可能在某种程度上是一个普通的用例,但是这些类型的计算往往在工作负载方面变得非常残酷。这意味着我们必须找到偷工减料的方法,以确保应用程序能够很好地扩展以适应更大规模的输入。
为应用程序的执行节省 time/space 的最常见方法是尝试以一种我们可以在保留所有功能的同时“削减”一个或多个作业的方式来设计可能的多个 MR 作业,或尝试最小化将在输入数据上实现的(自定义)映射器的数量。两者中的后者对于您要实现的这种过滤非常常见,因为我们可以轻松地只使用一个 Map 函数,它的每个实例都会检查当前正在读取的文件的名称以采取相应的行动。
更具体地说,我们可以通过Map class的setup
函数获取映射器开始运行之前的File1.txt
和File2.txt
文件名], 并使用当前要读取的文件的名称来确定如何将文件中的数据切碎并存储为键值对。对于你的问题,这个Map函数会输出两种键值对:
<customer_ID, customer_name>
(对于File1.txt
中的数据)<customer_ID, order_ID>
(对于File2.txt
中的数据)
然后 Reduce 函数的实例将为每个客户运行(当然,因为客户 ID 和名称是唯一的)并访问分组值,这些值只不过是 Text
包含此客户名称或订单 ID 的对象。我们只想输出没有任何订单记录的客户,所以我们要做的就是检查这个值列表的长度是否为 1
(也就是这个客户是否没有其他对值用他的名字)。
为了展示这一点,我将两个输入文件都放在 HDFS 的目录 /input
中(我对 File1.txt
中的列使用了两个制表符分隔符,对列使用了三个制表符分隔符File2.txt
。如果您的文件在列之间有不同的制表符或空格,您可以相应地更改它们):
File1.txt
Cust.No Name
1 Adam
2 Abe
3 Alex
4 Jones
File2.txt
Order.Num. Cust.No. Price
01 1 5422
02 1 23
03 2 1265
04 3 127
执行过滤的程序可能如下所示:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class OrderListFilter
{
/* input: <byte_offset, line_of_dataset>
* output: <customer_ID, customer_name> OR <customer_ID, order_ID>
*/
public static class Map extends Mapper<LongWritable, Text, Text, Text>
{
private String current_filename = "";
protected void setup(Context context)
{
// get the name of the current to-be-read file
InputSplit split = context.getInputSplit();
Path path = ((FileSplit) split).getPath();
current_filename = path.getName();
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
if(current_filename.equals("File1.txt")) // if mapper is reading through the customer's file
{
if(value.toString().contains("Cust.No")) // remove header
return;
else
{
String[] columns = value.toString().split("\t\t"); // 2 tabs as delimiter
// write customer ID as key and name as value
context.write(new Text(columns[0]), new Text(columns[1]));
}
}
else if(current_filename.equals("File2.txt")) // if mapper is reading through the order's file
{
if(value.toString().contains("Cust.No")) // remove header
return;
else
{
String[] columns = value.toString().split("\t\t\t"); // 3 tabs as delimiter
// write customer ID as key and order num as value
context.write(new Text(columns[1]), new Text(columns[0]));
}
}
}
}
/* input: <customer_ID, customer_name> OR <customer_ID, order_ID>
* output: <customer_ID, customer_name>
*/
public static class Reduce extends Reducer<Text, Text, Text, Text>
{
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
{
List<String> customer_records = new ArrayList<String>();
// put all the values in a list to find the size of them
for(Text value : values)
customer_records.add(value.toString());
// if there's only one record, i.e. just the ID and the customer's name in they key-value pairs,
// write their ID and name to output
if(customer_records.size() == 1)
context.write(key, new Text(customer_records.get(0)));
}
}
public static void main(String[] args) throws Exception
{
// set the paths of the input and output directories in the HDFS
Path input_dir = new Path("input");
Path output_dir = new Path("output");
// in case the output directory already exists, delete it
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if(fs.exists(output_dir))
fs.delete(output_dir, true);
// configure the MapReduce job
Job job = Job.getInstance(conf, "Order List Filter");
job.setJarByClass(OrderListFilter.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, input_dir);
FileOutputFormat.setOutputPath(job, output_dir);
job.waitForCompletion(true);
}
}
它的输出似乎还不错(忽略我设置中的警告):