确定代码中 Hadoop 错误的原因,因为标准日志不确定:文件拆分、容器内存或块大小

Determine cause of Hadoop error in code, as standard logs inconclusive: file splits, container memory, or block size

一段时间以来,我一直在浏览 log4j 日志,试图确定我的 Hadoop 作业崩溃的原因。

本质上,作业试图做的是在底层机器上发出命令,并收集该命令的输出——目前所有这些步骤都发生在映射作业中(稍后我将尝试减少这些单独输出的总和 )。

我遇到的行为是 - 对于生成到 BufferedReader 的一定数量的输出,为了对话 - 其中 28,一切正常,工作几乎完成立即,然而 当我将该数字增加到 29 时,地图作业在 67% 完成时挂起 - 尝试三次 - 总是在 67% 处停止并最终因缺乏进展而终止。

从发布作业的NameNode我们可以看到如下输出:

17/10/09 15:19:29 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
17/10/09 15:19:29 INFO input.FileInputFormat: Total input files to process : 1
17/10/09 15:19:30 INFO mapreduce.JobSubmitter: number of splits:1
17/10/09 15:19:30 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1507562353923_0001
17/10/09 15:19:30 INFO impl.YarnClientImpl: Submitted application application_1507562353923_0001
17/10/09 15:19:30 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1507562353923_0001/
17/10/09 15:19:30 INFO mapreduce.Job: Running job: job_1507562353923_0001
17/10/09 15:19:36 INFO mapreduce.Job: Job job_1507562353923_0001 running in uber mode : false
17/10/09 15:19:36 INFO mapreduce.Job:  map 0% reduce 0%
17/10/09 15:19:53 INFO mapreduce.Job:  map 67% reduce 0%
17/10/09 15:30:05 INFO mapreduce.Job: Task Id : attempt_1507562353923_0001_m_000000_0, Status : FAILED
AttemptID:attempt_1507562353923_0001_m_000000_0 Timed out after 600 secs
Sent signal OUTPUT_THREAD_DUMP (SIGQUIT) to pid 6230 as user ubuntu for container container_1507562353923_0001_01_000002, result=success
Container killed by the ApplicationMaster.
Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

17/10/09 15:30:06 INFO mapreduce.Job:  map 0% reduce 0%
17/10/09 15:30:26 INFO mapreduce.Job:  map 67% reduce 0%
17/10/09 15:40:36 INFO mapreduce.Job: Task Id : attempt_1507562353923_0001_m_000000_1, Status : FAILED
AttemptID:attempt_1507562353923_0001_m_000000_1 Timed out after 600 secs
17/10/09 15:40:37 INFO mapreduce.Job:  map 0% reduce 0%
17/10/09 15:40:52 INFO mapreduce.Job:  map 67% reduce 0%
17/10/09 15:51:05 INFO mapreduce.Job: Task Id : attempt_1507562353923_0001_m_000000_2, Status : FAILED
AttemptID:attempt_1507562353923_0001_m_000000_2 Timed out after 600 secs
17/10/09 15:51:06 INFO mapreduce.Job:  map 0% reduce 0%
17/10/09 15:51:24 INFO mapreduce.Job:  map 67% reduce 0%
17/10/09 16:01:37 INFO mapreduce.Job:  map 100% reduce 100%
17/10/09 16:01:37 INFO mapreduce.Job: Job job_1507562353923_0001 failed with state FAILED due to: Task failed task_1507562353923_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

17/10/09 16:01:37 INFO mapreduce.Job: Counters: 13
    Job Counters 
        Failed map tasks=4
        Killed reduce tasks=1
        Launched map tasks=4
        Other local map tasks=3
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=5025782
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=2512891
        Total time spent by all reduce tasks (ms)=0
        Total vcore-milliseconds taken by all map tasks=2512891
        Total vcore-milliseconds taken by all reduce tasks=0
        Total megabyte-milliseconds taken by all map tasks=10292801536
        Total megabyte-milliseconds taken by all reduce tasks=0

为了确定这个问题的根本原因,我花了很多时间仔细研究 log4j 输出日志,但它们没有得出任何结论 - 例如 - 它们看起来几乎完全相同jobs 和 unsuccessful jobs,有一个小警告,unsuccessful jobs,突然终止像这样:

最有趣的部分似乎在这里(来自之前的一次尝试(Hadoop 总是尝试 3 次才能完成工作)):

2017-10-09 15:40:35,821 WARN [main] OpcodeCount$TokenizerMapper: Code: opAdd, Time: 214
2017-10-09 15:40:35,821 WARN [main] OpcodeCount$TokenizerMapper: Code: opAdd, Time: 450
2017-10-09 15:40:35,821 WARN [main] OpcodeCount$TokenizerMapper: Code: opAdd, Time: 217
2017-10-09 15:40:35,821 WARN [main] OpcodeCount$TokenizerMapper: Code: opStop, Time: 165
2017-10-09 15:40:35,851 INFO [main] org.apache.hadoop.mapred.MapTask: Starting flush of map output
2017-10-09 15:40:35,851 INFO [main] org.apache.hadoop.mapred.MapTask: Spilling map output
2017-10-09 15:40:35,851 INFO [main] org.apache.hadoop.mapred.MapTask: bufstart = 0; bufend = 1977; bufvoid = 104857600
2017-10-09 15:40:35,851 INFO [main] org.apache.hadoop.mapred.MapTask: kvstart = 26214396(104857584); kvend = 26214396(104857584); length = 1/6553600
(&container_1507562353923_0001_01_000005???stderr0stdout105912017-10-09 16:01:35
Full thread dump OpenJDK 64-Bit Server VM (25.131-b11 mixed mode):

单词“全线程”让我相信我的 MapOutputBuffer 正在填满,如 this article 中所述,我尝试了推荐的增加配置文件大小的解决方案,即将 mapreduce.task.io.sort.mb 增加到 1000,将 mapreduce.map.sort.spill.percent 增加到 0.99,但没有任何积极效果。

我尝试的下一件事是更改程序,使其不再以 Log 的形式从 log4j 输出 debug/sanity 检查输出,而是让它将其作为 decribed here,但这也没有用。


目前我对这个程序可能有什么问题的想法如下:

1) MapOutputBuffer 正在填满导致程序停止

我应该提到的是 - 一个文件作为原子映射任务执行很重要 - 如果文件中的命令被分解成更小的组件,它向底层发出的操作 OS 没有意义 - 因此:

2) 输入文件在 HDFS 内部被分解,并且没有作为一个连续的单元以正确的顺序读入

3) 命令在容器之间被分解,命令的那一部分在一个容器中发出,而另一个容器负责另一部分——从而导致我在上面指定的问题。


因此,由于日志尚无定论,我的下一个目标是以某种方式更改我的代码,以便我可以隔离、测试并消除这些可能性中的每一种 --> 关于我如何做的任何想法以一种有效的方式——或任何和所有关于我在这里写的任何其他内容的评论、见解、评论将不胜感激。

地图作业目前是这样写的:

  public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    // declare logger
    private final Logger LOG = org.apache.log4j.Logger.getLogger(this.getClass());

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

      // output data struct
      List<Map.Entry<String,Integer>> pairList = new ArrayList<>();

      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
          try {
              // command execution
              Runtime rt = Runtime.getRuntime();
              String evmDir = "/home/ubuntu/go/src/github.com/ethereum/go-ethereum/build/bin/evm";
              String command = evmDir + " --debug --code " + value.toString() + " run";
              Process proc = Runtime.getRuntime().exec(command);

              LOG.warn(command);

              BufferedReader stdInput = new BufferedReader(new InputStreamReader(proc.getInputStream()));
              BufferedReader stdError = new BufferedReader(new InputStreamReader(proc.getErrorStream()));

              // define and initialise representation to hold 'evm' command output
              ArrayList<String> consoleOutput = new ArrayList<String>();
              ArrayList<String> debugOutput   = new ArrayList<String>();

              String s = null;
              while ((s = stdInput.readLine()) != null) {
                  consoleOutput.add(s);
              }
              while ((s = stdError.readLine()) != null) {
                  debugOutput.add(s);
              }

              for (String p : consoleOutput) {
                  Pattern pattern = Pattern.compile("([A-Za-z]+)([ \t]+)(\d+)");
                  Matcher matcher = pattern.matcher(p);
                  while (matcher.find()) {
                      String opcodeName = matcher.group(1);
                      Integer executionStepTime = Integer.valueOf(matcher.group(3));
                      // add component pieces of line to output data structure
                      pairList.add(new AbstractMap.SimpleEntry<>(opcodeName, executionStepTime));
                  }
              }

          } catch (IOException e) {
              //LOG.warn(e);
              LOG.warn("Exception Encountered! " + e);
          }

          // log output for debugging
          for (Map.Entry<String, Integer> entry : pairList) {
              String opcodeRep = entry.getKey().toString();
              Integer stepTime = entry.getValue();
              LOG.warn("Code: " + opcodeRep + ", Time: " + stepTime);
          }

          word.set(itr.nextToken());
          context.write(word, one);
      }
    }
  }

也许日志中的这些附加信息看起来可能与正确诊断此问题有关:

"VM Thread" os_prio=0 tid=0x00007f0a1007a000 nid=0x7328 runnable 

"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x00007f0a10026000 nid=0x7326 runnable 

"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x00007f0a10027800 nid=0x7327 runnable 

"VM Periodic Task Thread" os_prio=0 tid=0x00007f0a100bc800 nid=0x732f waiting on condition 

JNI global references: 277

Heap
 PSYoungGen      total 100352K, used 7502K [0x0000000780000000, 0x0000000788a00000, 0x00000007c0000000)
  eden space 94720K, 2% used [0x0000000780000000,0x0000000780216d78,0x0000000785c80000)
  from space 5632K, 95% used [0x0000000788480000,0x00000007889bcbc8,0x0000000788a00000)
  to   space 8192K, 0% used [0x0000000787a00000,0x0000000787a00000,0x0000000788200000)
 ParOldGen       total 143360K, used 113628K [0x0000000700000000, 0x0000000708c00000, 0x0000000780000000)
  object space 143360K, 79% used [0x0000000700000000,0x0000000706ef71d0,0x0000000708c00000)
 Metaspace       used 25981K, capacity 26224K, committed 26496K, reserved 1073152K
  class space    used 3019K, capacity 3083K, committed 3200K, reserved 1048576K

要了解映射器的确切位置,可以使用 jstack [获取线程转储]。

Jstack ships 和 jdk 你可以在卡住的映射器进程上使用它,如下所示。

第 0 步:找到您的映射任务所在的主机名 运行ning 并记下 task_id

step1:登录节点并运行

ps辅助 | grep task_id

识别进程 ID 和进程的用户名,tar与 /usr/java/jdk/bin/java

绑定

step2: su 到进程所有者用户名

step3: export java home 和 bin 路径 [示例:export JAVA_HOME=/usr/java/jdk1.7.0_67 && export PATH=$JAVA_HOME/bin :$路径]

step4:将pid替换为你在step1中获得的pid:

导出 PID=PID 对于我在 $(seq 1 10);做 echo "Jstack Iteration $i"; jstack $PID > /tmp/hungtask-hostname-${PID}.jstack.$i;睡眠 5 秒;完毕 tar zcvf hungtask.tar.gz /tmp/hungtask-hostname-${PID}.jstack.*

hungtask.tar.gz 将包含以 5 秒为间隔获取的进程的线程转储,共十次。当任务进入挂起状态时,您可能需要 运行 脚本。

在此之后,如果您可以上传 hungtask.tar.gz 到此线程,我可以看到并分享我的观察。

另外,要了解进程是否正在频繁进行 GC,您可以尝试以下命令

jstat -gc -t PID 步骤

PID 是要监控的 java 进程的进程 ID STEP 是样本时间步长

您可以将内容粘贴到网站http://nix-on.blogspot.in/2015/01/java-jstat-how-to-visualize-garbage.html以了解是否进行了过多的GC