如何将 Java 程序重写为 Hadoop 作业?

How to rewrite a Java program as a Hadoop job?

必须对 Java 程序进行哪些 绝对 最小修改才能使其适合 map-reduce?

这是我的 Java 程序:

import java.io.*;

class evmTest {

public static void main(String[] args) {

    try {

        Runtime rt = Runtime.getRuntime();
        String command = "evm --debug --code 7f00000000000000000000000000000000000000000000000000000000000000027f00000000000000000000000000000000000000000000000000000000000000027f00000000000000000000000000000000000000000000000000000000000000020101 run";
        Process proc = rt.exec(command);

        BufferedReader stdInput = new BufferedReader(new 
             InputStreamReader(proc.getInputStream()));

        BufferedReader stdError = new BufferedReader(new 
             InputStreamReader(proc.getErrorStream()));

        // read the output from the command
        System.out.println("Here is the standard output of the command:\n");
        String s = null;
        while ((s = stdInput.readLine()) != null) {
            System.out.println(s);
        }

        // read any errors from the attempted command
        System.out.println("Here is the standard error of the command (if any):\n");
        while ((s = stdError.readLine()) != null) {
            System.out.println(s);
        }

    } catch (IOException e) {
        System.out.println(e);
    }

}

}

它打印终端的输出,以这种方式呈现:

Here is the standard output of the command:

0x
Here is the standard error of the command (if any):

#### TRACE ####
PUSH32          pc=00000000 gas=10000000000 cost=3

PUSH32          pc=00000033 gas=9999999997 cost=3
Stack:
00000000  0000000000000000000000000000000000000000000000000000000000000002

PUSH32          pc=00000066 gas=9999999994 cost=3
Stack:
00000000  0000000000000000000000000000000000000000000000000000000000000002
00000001  0000000000000000000000000000000000000000000000000000000000000002

ADD             pc=00000099 gas=9999999991 cost=3
Stack:
00000000  0000000000000000000000000000000000000000000000000000000000000002
00000001  0000000000000000000000000000000000000000000000000000000000000002
00000002  0000000000000000000000000000000000000000000000000000000000000002

ADD             pc=00000100 gas=9999999988 cost=3
Stack:
00000000  0000000000000000000000000000000000000000000000000000000000000004
00000001  0000000000000000000000000000000000000000000000000000000000000002

STOP            pc=00000101 gas=9999999985 cost=0
Stack:
00000000  0000000000000000000000000000000000000000000000000000000000000006

#### LOGS ####

当然,这是 Apache 示例中最简单的 map-reduce 作业之一:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

我的问题是 - 映射简化我在此 post 顶部共享的 Java 程序的最简单方法是什么?


更新

运行 使用此命令:

$HADOOP_HOME/bin/hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar -D mapreduce.job.reduces=0 -input /input_0 -output /steaming-output -mapper ./mapper.sh

导致此错误:

开始 运行 遇到问题:

17/09/26 03:26:56 INFO mapreduce.Job: Task Id : attempt_1506277206531_0004_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: Error in configuring object

来自服务器的附加信息:

因此,这不是试图为您提供解决方案,而是推动您朝着应该前进的方向前进。

如前所述,要先做点事情

假设您有一些这样的文件放在 hdfs:///input/codes.txt

7f0000000002812
7f000000000281a
7f000000000281b
7f000000000281c

非常 "simple" WordCount 代码实际上可以处理这些数据!但是,显然你不需要计算任何东西,你甚至不需要减速器。你有一个只有地图的工作将开始这样的事情。

private final Runtime rt = Runtime.getRuntime();

public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {
    String command = "evm --debug --code " + value.toString() + " run";
    Process proc = rt.exec(command);

    context.write( ... some_key, some_value ...);
}

但是,您真的根本不需要 Java。您有一个 shell 命令,因此您可以使用 Hadoop Streaming 将其 运行 和 "stream" 来自 HDFS 的代码转换为 stdin 用于您的脚本。

那个映射器看起来像这样。

#!/bin/bash
### mapper.sh

while read code; do
   evm --debug --code $code run
done

您甚至可以在没有 Hadoop 的情况下 本地 测试该代码(如果您确实需要 Hadoop 的开销,无论如何您都应该尝试做一个基准测试)

mapper.sh < codes.txt

由您决定,哪个选项最有效...对于极简主义者来说,Hadoop 流看起来更简单。

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming*.jar \
    -D mapreduce.job.reduces=0 \
    -input /input \
    -output /tmp/steaming-output \
    -mapper ~/mapper.sh

另外值得一提的是 - 任何标准输出/标准错误都将收集到 YARN 应用程序日志中,不一定返回到 HDFS。