使用纱线比较器在 MapReduce Python 中进行字数统计排序
Word count sorting in MapReduce Python using yarn comparator
我想解决字数统计问题,想得到按照文件中出现频率倒序排列的结果
以下是我为此编写的四个文件(2 个映射器和 2 个缩减器,因为一个 Map Reduce 作业无法解决此问题):
1) mapper1.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split('\t', 1)
except ValueError as e:
continue
words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
for word in words:
print "%s\t%d" % (word.lower(), 1)
2) reducer1.py
import sys
current_key = None
word_sum = 0
for line in sys.stdin:
try:
key, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print "%s\t%d" % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count
if current_key:
print "%s\t%d" % (current_key, word_sum)
3) mapper2.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
4) reducer2.py
import sys
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
以下是我在bash环境下运行的两条yarn命令
OUT_DIR="wordcount_result_1"
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount" \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper1.py,reducer1.py \
-mapper "python mapper1.py" \
-combiner "python reducer1.py" \
-reducer "python reducer1.py" \
-input /test/articles-part-short \
-output ${OUT_DIR} > /dev/null
OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount Rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D map.output.key.field.separator=\t \
-D mapreduce.partition.keycomparator.options=-k2,2nr \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input ${OUT_DIR} \
-output ${OUT_DIR_2} > /dev/null
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head
这没有给我正确的答案。有人可以解释一下哪里出了问题吗?
另一方面,
在mapper2.py
中如果我按以下方式打印,
print "%d\t%s" % (count, word)
并且在 reducer2.py
中,如果我按以下方式阅读,
count, word = line.strip().split('\t', 1)
并将第二条纱线命令选项编辑为
-D mapreduce.partition.keycomparator.options=-k1,1nr
它给了我正确的答案。
为什么在上述两种情况下表现不同?
有人可以帮助我了解 Hadoop MapReduce 的比较器选项吗?
这会起作用
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k2nr' \
-D stream.num.map.output.key.fields=2 \
-D mapred.map.tasks=1 \
-D mapreduce.job.reduces=1 \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input /user/jovyan/assignment0_1563877099149160 \
-output ${OUT_DIR} > /dev/null
我想解决字数统计问题,想得到按照文件中出现频率倒序排列的结果
以下是我为此编写的四个文件(2 个映射器和 2 个缩减器,因为一个 Map Reduce 作业无法解决此问题):
1) mapper1.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
article_id, text = unicode(line.strip()).split('\t', 1)
except ValueError as e:
continue
words = re.split("\W*\s+\W*", text, flags=re.UNICODE)
for word in words:
print "%s\t%d" % (word.lower(), 1)
2) reducer1.py
import sys
current_key = None
word_sum = 0
for line in sys.stdin:
try:
key, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
if current_key != key:
if current_key:
print "%s\t%d" % (current_key, word_sum)
word_sum = 0
current_key = key
word_sum += count
if current_key:
print "%s\t%d" % (current_key, word_sum)
3) mapper2.py
import sys
import re
reload(sys)
sys.setdefaultencoding('utf-8') # required to convert to unicode
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
4) reducer2.py
import sys
for line in sys.stdin:
try:
word, count = line.strip().split('\t', 1)
count = int(count)
except ValueError as e:
continue
print "%s\t%d" % (word, count)
以下是我在bash环境下运行的两条yarn命令
OUT_DIR="wordcount_result_1"
NUM_REDUCERS=8
hdfs dfs -rm -r -skipTrash ${OUT_DIR} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount" \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper1.py,reducer1.py \
-mapper "python mapper1.py" \
-combiner "python reducer1.py" \
-reducer "python reducer1.py" \
-input /test/articles-part-short \
-output ${OUT_DIR} > /dev/null
OUT_DIR_2="wordcount_result_2"
NUM_REDUCERS=1
hdfs dfs -rm -r -skipTrash ${OUT_DIR_2} > /dev/null
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount Rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D map.output.key.field.separator=\t \
-D mapreduce.partition.keycomparator.options=-k2,2nr \
-D mapreduce.job.reduces=${NUM_REDUCERS} \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input ${OUT_DIR} \
-output ${OUT_DIR_2} > /dev/null
hdfs dfs -cat ${OUT_DIR_2}/part-00000 | head
这没有给我正确的答案。有人可以解释一下哪里出了问题吗?
另一方面,
在mapper2.py
中如果我按以下方式打印,
print "%d\t%s" % (count, word)
并且在 reducer2.py
中,如果我按以下方式阅读,
count, word = line.strip().split('\t', 1)
并将第二条纱线命令选项编辑为
-D mapreduce.partition.keycomparator.options=-k1,1nr
它给了我正确的答案。
为什么在上述两种情况下表现不同?
有人可以帮助我了解 Hadoop MapReduce 的比较器选项吗?
这会起作用
yarn jar /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-streaming.jar \
-D mapred.jab.name="Streaming wordCount rating" \
-D mapreduce.job.output.key.comparator.class=org.apache.hadoop.mapreduce.lib.partition.KeyFieldBasedComparator \
-D mapreduce.partition.keycomparator.options='-k2nr' \
-D stream.num.map.output.key.fields=2 \
-D mapred.map.tasks=1 \
-D mapreduce.job.reduces=1 \
-files mapper2.py,reducer2.py \
-mapper "python mapper2.py" \
-reducer "python reducer2.py" \
-input /user/jovyan/assignment0_1563877099149160 \
-output ${OUT_DIR} > /dev/null