Hadoop MapReducer 字符串和数学运算作为输出 - 错误

Hadoop MapReducer string and math operations as output - error

我正在构建 mapreducer 应用程序,它将带有随机数的 .txt 作为输入,我想接收这样的输出信息:

最大数量:xx 算术平均值:xx 几何平均值:xx 中位数:xx

我的代码:

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NumCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text number = new Text();
    

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        number.set(itr.nextToken());
        context.write(number, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {

      List<Integer> numList = new ArrayList<Integer>();

      for (IntWritable val : values) {
         numList.add(val.get());
      }
    
      // Max number from file
      int maxNumber = Collections.max(numList,null);

      // Arithmetic average
      float sum = 0;
      for (int i : numList)
        sum += i;

      float arithmeticAverage = sum / numList.size();

      // Geometric average
      sum = 1;
      for (int i : numList)
        sum *= i;
      
      double geometricAverage = Math.pow(sum, (float)1/numList.size());

      // Median

      float median;
      
      if (numList.size() % 2 == 0)
         median = (float)(numList.get(numList.size()/2) + numList.get(numList.size()/2 - 1))/2;
      else
         median = numList.get(numList.size()/2);

      String summary = "Max number: " + maxNumber + "\nArithmetic avg: " + arithmeticAverage + "\nGeometric avg: " + geometricAverage + "\nMedian" + median;

      result.set(summary);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "number count");
    job.setJarByClass(NumCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

我的代码的问题是,我收到无法将字符串放入 IntWritable 的错误(看起来合乎逻辑,但我如何解析字符串值以输出?)

result.set(summary);

更重要的是,当我尝试做这样的事情时:

result.set(median);

我没有收到中值,而是收到了错误的输出,这是输入文件中附近带有“1”的数字列表。

我对 hadoop 完全陌生,我不知道如何正确地做这件事,有什么建议吗? ;x

因为你有String summary,显然答案是使用Text而不是IntWritable...如果你有多个则不要使用IntWritable值为 return,其中的倍数不是整数。

另外,这个逻辑甚至是不正确的,因为所有相等的数字最终都在同一个 reducer 中,所以“maxNumber”永远不会是整体最大值,例如,因此你会有相同的 reducer 输出值作为唯一的输入值。解决方案是使用 NullWritable 作为缩减器键(和映射器键输出),强制所有数字进入一个缩减器,这样它们就可以是 maxed/averaged/summed,等等。你也不需要 numList因为 Iterable<IntWritable> 已经可以迭代了;你应该只需要一个循环来完成所有的计算,除了中位数,你需要先对数字进行排序。

我个人的建议是使用 Spark 或 Hive 进行统计分析,而不是准系统 Mapreduce...