每个产品的平均 order_demand 作为输出 - MapReduce - Java
average of order_demand for each product as output - MapReduce - Java
我是 mapreduce 主题的新手,仍处于学习阶段。我提前感谢您的帮助和进一步的提示。在大学的一次练习中,我遇到了以下问题:
从 csv 文件(下面作为示例列出)我想计算每个 product_code.
的平均值 order_demand
“FrequencyMapper”和“FreqeuencyReducer”下方显示的代码 运行 在我的服务器上,我认为我目前有输出显示问题。
因为我是第一次使用 mapreduce,所以我很感激任何帮助。
下面列出了 mapper、reducer 和 driver 代码。
数据集示例 (csv-file)
Product_Code,Warehouse,Product_Category,Date,Order_Demand
Product_0993,Whse_J,Category_028,2012/7/27,100
Product_0979,Whse_J,Category_028,2012/6/5,500
Product_0979,Whse_E,Category_028,2012/11/29,500
Product_1157,Whse_E,Category_006,2012/6/4,160000
Product_1159,Whse_A,Category_006,2012/7/17,50000
例如我的目标:
Product_0979 500
Product_1157 105000
...
FrequencyMapper.java:
package ma.test.a02;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FrequencyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {
String line = lineText.toString();
if(line.contains("Product")) {
String productcode = line.split(",")[0];
float orderDemand = Float.parseFloat(line.split(",")[4]);
context.write(new Text(productcode), new IntWritable((int) orderDemand));
}
}
}
FrequencyReducer.java:
package ma.test.a02;
import java.io.IOException;
import javax.xml.soap.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class FrequencyReducer extends Reducer< Text , IntWritable , IntWritable , FloatWritable > {
public void reduce( IntWritable productcode, Iterable<IntWritable> orderDemands, Context context)
throws IOException, InterruptedException {
float averageDemand = 0;
float count = 0;
for ( IntWritable orderDemand : orderDemands) {
averageDemand +=orderDemand.get();
count +=1;
}
float result = averageDemand / count;
context.write(productcode, new FloatWritable (result));
}
}
Frequency.java (Driver):
package ma.test.a02;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Frequency {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}
// create a Hadoop job and set the main class
Job job = Job.getInstance();
job.setJarByClass(Frequency.class);
job.setJobName("MA-Test Average");
// set the input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// set the Mapper and Reducer class
job.setMapperClass(FrequencyMapper.class);
job.setReducerClass(FrequencyReducer.class);
// specify the type of the output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
提示 1:在映射器中,您过滤了以下行中包含“VOLUME”的行:
if(line.contains("VOLUME")) {
}
但是没有一行包含“VOLUME”,所以你在 reducer 中没有输入!
提示 2:你的 reducer 输出值是 FloatWritable
并且你应该在你的 runner 中使用这一行(Frequency
class):
job.setOutputValueClass(FloatWritable.class);
而不是这个:
job.setOutputValueClass(IntWritable.class);
提示 3:在 reducer 中更改此行:
public class FrequencyReducer extends Reducer<IntWritable , IntWritable , IntWritable , FloatWritable>
给这个:
public class FrequencyReducer extends Reducer<Text, IntWritable, IntWritable, FloatWritable >
还将这些行添加到 Frequency
class:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
提示 4:csv 文件中描述 csv 文件结构的第一行会导致问题。通过将以下行放在地图方法的第一行来拒绝此行:
if(line.contains("Product_Code,Warehouse")) {
return;
}
提示 5:在实际程序中,请确保您有计划 String
不能在 [=25= 中强制转换为 Integer
].
最后你的映射器将是:
public class FrequencyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {
String line = lineText.toString();
if (line.contains("Product_Code,Warehouse")) {
return;
}
if (line.contains("Product")) {
String productcode = line.split(",")[0].trim();
int orderDemand = Integer.valueOf(line.split(",")[4].trim());
context.write(new Text(productcode), new IntWritable(orderDemand));
}
}
}
这是你的减速器:
public class FrequencyReducer extends Reducer<Text, IntWritable , Text, FloatWritable > {
public void reduce( Text productcode, Iterable<IntWritable> orderDemands, Context context)
throws IOException, InterruptedException {
float averageDemand = 0;
float count = 0;
for ( IntWritable orderDemand : orderDemands) {
averageDemand +=orderDemand.get();
count +=1;
}
float result = averageDemand / count;
context.write(productcode, new FloatWritable (result));
}
}
这是你的跑步者:
public class Frequency {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}
// create a Hadoop job and set the main class
Job job = Job.getInstance();
job.setJarByClass(Frequency.class);
job.setJobName("MA-Test Average");
// set the input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// set the Mapper and Reducer class
job.setMapperClass(FrequencyMapper.class);
job.setReducerClass(FrequencyReducer.class);
// specify the type of the output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
我是 mapreduce 主题的新手,仍处于学习阶段。我提前感谢您的帮助和进一步的提示。在大学的一次练习中,我遇到了以下问题: 从 csv 文件(下面作为示例列出)我想计算每个 product_code.
的平均值 order_demand“FrequencyMapper”和“FreqeuencyReducer”下方显示的代码 运行 在我的服务器上,我认为我目前有输出显示问题。 因为我是第一次使用 mapreduce,所以我很感激任何帮助。
下面列出了 mapper、reducer 和 driver 代码。
数据集示例 (csv-file)
Product_Code,Warehouse,Product_Category,Date,Order_Demand
Product_0993,Whse_J,Category_028,2012/7/27,100
Product_0979,Whse_J,Category_028,2012/6/5,500
Product_0979,Whse_E,Category_028,2012/11/29,500
Product_1157,Whse_E,Category_006,2012/6/4,160000
Product_1159,Whse_A,Category_006,2012/7/17,50000
例如我的目标:
Product_0979 500
Product_1157 105000
...
FrequencyMapper.java:
package ma.test.a02;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class FrequencyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {
String line = lineText.toString();
if(line.contains("Product")) {
String productcode = line.split(",")[0];
float orderDemand = Float.parseFloat(line.split(",")[4]);
context.write(new Text(productcode), new IntWritable((int) orderDemand));
}
}
}
FrequencyReducer.java:
package ma.test.a02;
import java.io.IOException;
import javax.xml.soap.Text;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
public class FrequencyReducer extends Reducer< Text , IntWritable , IntWritable , FloatWritable > {
public void reduce( IntWritable productcode, Iterable<IntWritable> orderDemands, Context context)
throws IOException, InterruptedException {
float averageDemand = 0;
float count = 0;
for ( IntWritable orderDemand : orderDemands) {
averageDemand +=orderDemand.get();
count +=1;
}
float result = averageDemand / count;
context.write(productcode, new FloatWritable (result));
}
}
Frequency.java (Driver):
package ma.test.a02;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Frequency {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}
// create a Hadoop job and set the main class
Job job = Job.getInstance();
job.setJarByClass(Frequency.class);
job.setJobName("MA-Test Average");
// set the input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// set the Mapper and Reducer class
job.setMapperClass(FrequencyMapper.class);
job.setReducerClass(FrequencyReducer.class);
// specify the type of the output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
提示 1:在映射器中,您过滤了以下行中包含“VOLUME”的行:
if(line.contains("VOLUME")) {
}
但是没有一行包含“VOLUME”,所以你在 reducer 中没有输入!
提示 2:你的 reducer 输出值是 FloatWritable
并且你应该在你的 runner 中使用这一行(Frequency
class):
job.setOutputValueClass(FloatWritable.class);
而不是这个:
job.setOutputValueClass(IntWritable.class);
提示 3:在 reducer 中更改此行:
public class FrequencyReducer extends Reducer<IntWritable , IntWritable , IntWritable , FloatWritable>
给这个:
public class FrequencyReducer extends Reducer<Text, IntWritable, IntWritable, FloatWritable >
还将这些行添加到 Frequency
class:
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
提示 4:csv 文件中描述 csv 文件结构的第一行会导致问题。通过将以下行放在地图方法的第一行来拒绝此行:
if(line.contains("Product_Code,Warehouse")) {
return;
}
提示 5:在实际程序中,请确保您有计划 String
不能在 [=25= 中强制转换为 Integer
].
最后你的映射器将是:
public class FrequencyMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
public void map(LongWritable offset, Text lineText, Context context)
throws IOException, InterruptedException {
String line = lineText.toString();
if (line.contains("Product_Code,Warehouse")) {
return;
}
if (line.contains("Product")) {
String productcode = line.split(",")[0].trim();
int orderDemand = Integer.valueOf(line.split(",")[4].trim());
context.write(new Text(productcode), new IntWritable(orderDemand));
}
}
}
这是你的减速器:
public class FrequencyReducer extends Reducer<Text, IntWritable , Text, FloatWritable > {
public void reduce( Text productcode, Iterable<IntWritable> orderDemands, Context context)
throws IOException, InterruptedException {
float averageDemand = 0;
float count = 0;
for ( IntWritable orderDemand : orderDemands) {
averageDemand +=orderDemand.get();
count +=1;
}
float result = averageDemand / count;
context.write(productcode, new FloatWritable (result));
}
}
这是你的跑步者:
public class Frequency {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: Average <input path> <output path>");
System.exit(-1);
}
// create a Hadoop job and set the main class
Job job = Job.getInstance();
job.setJarByClass(Frequency.class);
job.setJobName("MA-Test Average");
// set the input and output path
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// set the Mapper and Reducer class
job.setMapperClass(FrequencyMapper.class);
job.setReducerClass(FrequencyReducer.class);
// specify the type of the output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FloatWritable.class);
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}