尽管 python 脚本在本地工作,但 运行 MapReduce 代码出错

Error while running MapReduce code although python scripts work locally

我有一个 mapper.py 和 reducer.py 来处理一个输入文件,它只是一个具有以下格式的常规 linux 文件:

ID \t time \t duration \t Description \t status

基本上我想在reducer中对我的ID进行分组,所以我构造了如下映射器:

#!/usr/bin/env python

import sys
import re

for line in sys.stdin:
    #remove leading and trailing whitespace
    line = line.strip()
    #split the line into portions
    portions = re.split(r'\t+',line)
    #take the first column (which is block number) to emit as key
    block = portions[0]
    print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])

然后在reducer中,我会做一些数据处理如下:

#!/usr/bin/env python

from operator import itemgetter
import sys

bitmapStr=""
current_block=None
block=start=duration=precision=status=""
round=0 #interval is every 11 mins or 660 seconds

for line in sys.stdin:
    line=line.strip()
    block,start,duration,precision,status=line.split('\t')

    if current_block == block:
            duration = int(duration)
            while round < duration:
                    if(status.islower()):
                            bitmapStr=bitmapStr+"1"
                    else:
                            bitmapStr=bitmapStr+"0"
                    round = round + 660

            #amount of time exceed this block record
            round = round - duration

    else:
            if current_block:
                    print '%s\t%s' % (current_block,bitmapStr)
            round=0
            bitmapStr=""
            current_block=block
            duration = int(duration)
            while round < duration:
                    if(status.islower()):
                            bitmapStr=bitmapStr+"1"
                    else:
                            bitmapStr=bitmapStr+"0"
                    round = round + 660
            #amount of time exceed this block record
            round = round - duration

if current_block == block:
    print '%s\t%s'  % (current_block,bitmapStr)

我通过以下方式在本地测试了映射器和缩减器:

cat small_data_sample | ./mapper.py | sort -k1,1 | ./reducer.py
#output is working as I expect

但是,当我尝试通过 Hadoop 运行 它时,它会产生以下错误:

Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:430)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

运行 hadoop 的具体命令如下:

bin/hadoop jar hadoop-streaming.jar \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
-D mapred.text.key.partitioner.options='-k1,1'  \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator -D mapred.text.key.comparator.options='-k1,1 -k2,2n' \
-D stream.num.map.output.key.fields=2 \
-input $hadoop_dir/data/sample \
-output $hadoop_dir/data/data_test1-output \
-mapper $dir/calBitmap_mapper.py \
-reducer $dir/calBitmap_reducer.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner

其中 $hadoop_dir 是我的 hdfs 位置的路径,$dir 是我的映射器和缩减器 python 脚本所在的位置。

请告诉我需要更正的错误。提前致谢!

*编辑:我尝试使用不同的输入文件(尺寸小得多),它似乎工作正常。因此我不知道为什么对于大输入文件,MapReduce 中断

我找到了错误的解决方案。在映射器中,我没有特别注意不同类型的输入。我的一些输入的前几行是注释,因此部分数组由于索引超出范围而失败。为了解决这个问题,我添加了一个检查:

if len(portions) == 5: #make sure it has 5 elements in there
    print '%s\t%s\t%s\t%s\t%s' % (block,portions[1],portions[2],portions[3],portions[4])