MapReduce编程过滤大输入文件

MapReduce programming to filter a large input file

我有一个非常大的输入文本文件,格式如下:

ID \t time \t product \t Description \t Status

状态列仅限于包含小写字母 a、s、i 或大写字母 A、S、I 或两者的混合(状态列中的样本元素:a、si、I、asi、ASI , aSI, Asi...)

我想实现的是使用MapReduce根据Status过滤掉这个输出文件。我想丢弃原始文件中状态至少有 1 个大写字母的所有行。换句话说,我只关心状态中所有小写字母的行。

我是 MapReduce 编程的新手,需要一些帮助。以下是我到目前为止的想法

我的mapper.py是:

import sys
import re

for line in sys.stdin:
    line = line.strip()
    portions = re.split(r'\t+', line)
    status = portions[-1] #pop out last item (status info) from portions list
    #Now I want to emit key as status and value as the portions list
    print '%s\t%s' % (status, portions) #obviously, I don't think it's correct, I got stuck at this part

我的 reducer.py 是:

import sys

#I'm assuming that I have read in status and portions.
#In my understanding, the number of output files depend on the number of reducers. But what I want is to discard all rows that has at least 1 upper case letter in status bar.
#The file output should be a single file with rows that have all lower case letters in status bar.

我认为检查状态是否至少有 1 个大写字母并不难,但我被困在如何丢弃不感兴趣的行并将所有输出文件合并到一个与原始格式相同的文件中文本文件。

非常感谢任何帮助我走上正确道路的帮助。提前致谢!

你完全可以在没有减速器的情况下做到这一点,

你的映射器中有这样的东西:

import sys
import re

for line in sys.stdin:
    line = line.strip()
    portions = re.split(r'\t+', line)
    status = portions[-1]
    if status.islower():
        whatever_you_want_to_write = status + ',' + portions #whatever
        sys.stdout.write(whatever_you_want_to_write)

有关 reading/writing 到 HDFS 的详细信息,请参阅 Hadoop Streaming 的文档。

类似这样的东西,例如:

$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \
    -input myInputDirs \
    -output myOutputDir \
    -mapper myPythonScript.py \
    -jobconf mapred.reduce.tasks=0 \
    -file myPythonScript.py

请注意如何指定 -jobconf mapred.reduce.tasks=0 来告诉 Hadoop 您不需要 reduce 步骤。