每个产品的平均 order_demand 作为输出 - MapReduce - Java

average of order_demand for each product as output - MapReduce - Java

我是 mapreduce 主题的新手,仍处于学习阶段。我提前感谢您的帮助和进一步的提示。在大学的一次练习中,我遇到了以下问题: 从 csv 文件(下面作为示例列出)我想计算每个 product_code.

的平均值 order_demand

“FrequencyMapper”和“FreqeuencyReducer”下方显示的代码 运行 在我的服务器上,我认为我目前有输出显示问题。 因为我是第一次使用 mapreduce,所以我很感激任何帮助。

下面列出了 mapper、reducer 和 driver 代码。

数据集示例 (csv-file)

Product_Code,Warehouse,Product_Category,Date,Order_Demand
Product_0993,Whse_J,Category_028,2012/7/27,100
Product_0979,Whse_J,Category_028,2012/6/5,500 
Product_0979,Whse_E,Category_028,2012/11/29,500 
Product_1157,Whse_E,Category_006,2012/6/4,160000 
Product_1159,Whse_A,Category_006,2012/7/17,50000 

例如我的目标:

Product_0979   500
Product_1157   105000
...

FrequencyMapper.java:

package ma.test.a02;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FrequencyMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {
 
 @Override
  public void map(LongWritable offset, Text lineText, Context context)
      throws IOException, InterruptedException {
     
    String line = lineText.toString();
    
    if(line.contains("Product")) {
        String productcode = line.split(",")[0];
        
        float orderDemand = Float.parseFloat(line.split(",")[4]);
        
        context.write(new Text(productcode), new IntWritable((int) orderDemand));
    }
  }
}

FrequencyReducer.java:

package ma.test.a02;

import java.io.IOException;

import javax.xml.soap.Text;

import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class FrequencyReducer extends Reducer< Text ,  IntWritable ,  IntWritable ,  FloatWritable > {
     public void reduce( IntWritable productcode,  Iterable<IntWritable> orderDemands,  Context context)
         throws IOException,  InterruptedException {
             
      float averageDemand  = 0;
      float count = 0;
      for ( IntWritable orderDemand : orderDemands) {
          
            averageDemand +=orderDemand.get();
            count +=1;
        }
      
      float result = averageDemand / count;
    
      context.write(productcode,  new FloatWritable (result));
    }
}

Frequency.java (Driver):

package ma.test.a02;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Frequency {
 
  public static void main(String[] args) throws Exception {
    if (args.length != 2) {
      System.err.println("Usage: Average <input path> <output path>");
      System.exit(-1);
    }
    
    // create a Hadoop job and set the main class
    Job job = Job.getInstance();
    job.setJarByClass(Frequency.class);
    job.setJobName("MA-Test Average");
    
    // set the input and output path
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
    // set the Mapper and Reducer class
    job.setMapperClass(FrequencyMapper.class);
    job.setReducerClass(FrequencyReducer.class);
    
    // specify the type of the output
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    
    // run the job
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

提示 1:在映射器中,您过滤了以下行中包含“VOLUME”的行:

if(line.contains("VOLUME")) {

}

但是没有一行包含“VOLUME”,所以你在 reducer 中没有输入!

提示 2:你的 reducer 输出值是 FloatWritable 并且你应该在你的 runner 中使用这一行(Frequency class):

job.setOutputValueClass(FloatWritable.class);

而不是这个:

job.setOutputValueClass(IntWritable.class);

提示 3:在 reducer 中更改此行:

public class FrequencyReducer extends Reducer<IntWritable ,  IntWritable ,  IntWritable ,  FloatWritable> 

给这个:

public class FrequencyReducer extends Reducer<Text, IntWritable,  IntWritable, FloatWritable > 

还将这些行添加到 Frequency class:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

提示 4:csv 文件中描述 csv 文件结构的第一行会导致问题。通过将以下行放在地图方法的第一行来拒绝此行:

if(line.contains("Product_Code,Warehouse")) {
    return;
}

提示 5:在实际程序中,请确保您有计划 String 不能在 [=25= 中强制转换为 Integer ].

最后你的映射器将是:

public class FrequencyMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    public void map(LongWritable offset, Text lineText, Context context)
            throws IOException, InterruptedException {

        String line = lineText.toString();

        if (line.contains("Product_Code,Warehouse")) {
            return;
        }

        if (line.contains("Product")) {
            String productcode = line.split(",")[0].trim();
            int orderDemand = Integer.valueOf(line.split(",")[4].trim());
            context.write(new Text(productcode), new IntWritable(orderDemand));
        }
    }
}

这是你的减速器:

public class FrequencyReducer extends Reducer<Text, IntWritable , Text, FloatWritable > {
    public void reduce( Text productcode,  Iterable<IntWritable> orderDemands,  Context context)
            throws IOException,  InterruptedException {

        float averageDemand  = 0;
        float count = 0;
        for ( IntWritable orderDemand : orderDemands) {

            averageDemand +=orderDemand.get();
            count +=1;
        }

        float result = averageDemand / count;

        context.write(productcode,  new FloatWritable (result));
    }
}

这是你的跑步者:

public class Frequency {

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println("Usage: Average <input path> <output path>");
            System.exit(-1);
        }
        
        // create a Hadoop job and set the main class
        Job job = Job.getInstance();
        job.setJarByClass(Frequency.class);
        job.setJobName("MA-Test Average");

        // set the input and output path
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // set the Mapper and Reducer class
        job.setMapperClass(FrequencyMapper.class);
        job.setReducerClass(FrequencyReducer.class);

        // specify the type of the output
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);

        // run the job
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}