Dataproc Hadoop MapReduce - 无法正常工作
Dataproc Hadoop MapReduce - can't get it to work
我基本上是在尝试 运行 我的第一个 Hadoop MapReduce 例程,我必须使用 Hadoop 和 MapReduce,因为我正在为一个 class 项目这样做。我想将 Python 用于映射器和缩减器,因为我最熟悉这种语言,而且我的同龄人也最熟悉它。我觉得对我来说最简单的设置方法是通过 Google DataProc 实例,所以我也有 运行ning。我将描述我做了什么以及我使用了哪些资源,但我对此比较陌生,可能会遗漏一些东西。
Dataproc 配置
Dataproc 1
Dataproc 2
Dataproc 3
然后,我可以通过 SSH 连接到我的主节点。我将 mapper.py
和 reducer.py
文件存储在 Google Cloud Storage 存储桶中。
Mapper 和 reducer 代码来自 this Micheal Noll blog post,修改为与 Python 3.
一起使用
mapper.py:
#!/usr/bin/env python
"""mapper.py"""
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
#print ('%s\t%s' % (word, 1))
print(f"{word}\t{1}")
reducer.py
#!/usr/bin/env python
"""reducer.py"""
from operator import itemgetter
import sys
print_out = lambda x, y: print(f'{x}\t{y}')
current_word = None
current_count = 0
word = None
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
#print("still working")
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
最后,我通过 ssh 进入我的主节点,然后检查我的 python 版本:
hduser@data-604-m:~$ python
Python 3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
我 运行 以下内容(改编自 ):
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
-mapper mapper.py \
-reducer reducer.py \
-input gs://data-604-hadoop/books/pg20417.txt \
-output gs://data-604-hadoop/output
结果如下:
hduser@data-604-m:~$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py -map
per mapper.py -reducer reducer.py -input gs://data-604-hadoop/books/pg20417.txt -output gs://data-604-hadoop/output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.9.2.jar] /tmp/streamjob4601880105330541890.jar tmpDir=null
19/11/12 02:10:46 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:47 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:49 INFO mapred.FileInputFormat: Total input files to process : 1
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: number of splits:15
19/11/12 02:10:49 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher
.enabled
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573523684358_0002
19/11/12 02:10:50 INFO impl.YarnClientImpl: Submitted application application_1573523684358_0002
19/11/12 02:10:50 INFO mapreduce.Job: The url to track the job: http://data-604-m:8088/proxy/application_1573523684358_0002/
19/11/12 02:10:50 INFO mapreduce.Job: Running job: job_1573523684358_0002
19/11/12 02:10:58 INFO mapreduce.Job: Job job_1573523684358_0002 running in uber mode : false
19/11/12 02:10:58 INFO mapreduce.Job: map 0% reduce 0%
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:19 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:20 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:28 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:30 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:37 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:39 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:40 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:48 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:49 INFO mapreduce.Job: map 80% reduce 0%
19/11/12 02:11:50 INFO mapreduce.Job: map 100% reduce 100%
19/11/12 02:11:50 INFO mapreduce.Job: Job job_1573523684358_0002 failed with state FAILED due to: Task failed task_1573523684358_0002_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0
19/11/12 02:11:50 INFO mapreduce.Job: Counters: 14
Job Counters
Failed map tasks=19
Killed map tasks=14
Killed reduce tasks=5
Launched map tasks=22
Other local map tasks=14
Rack-local map tasks=8
Total time spent by all maps in occupied slots (ms)=885928
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=221482
Total vcore-milliseconds taken by all map tasks=221482
Total megabyte-milliseconds taken by all map tasks=453595136
Map-Reduce Framework
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
19/11/12 02:11:50 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
而且,老实说,我现在不知道该怎么做。我已经花了很多时间在这上面,但我觉得自己就像在一堵砖墙上,因为我不确定哪里出了问题。
我也试过:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
-mapper ./mapper.py \
-reducer ./reducer.py \
-input gs://data-604-hadoop/books/pg20417.txt \
-output gs://data-604-hadoop/output
结果相似。
感谢任何帮助。
更新:
我又尝试了一些东西,但没有成功。
我已经尝试将我的 python 脚本移动到 Hadoop 集群上。然后我用 head -n100 mobydick.txt | ./mapper.py | sort | ./reducer.py
测试了它们并且它们有效。在下面的评论中,我提到我查看了我的 shebang 并进行了更改,但它们也不成功。
这里发生了一些不同的事情,但主要归结为不一定能够假设每个 mapper/reducer 任务的系统环境(运行ning 作为YARN 容器)必须具有与您登录时相同的系统环境 shell。在大多数情况下,许多元素将有意不同(例如 Java class 路径等)。通常,对于基于 Java 的 MapReduce 程序,这会按预期工作,因为您最终会得到类似的环境变量和 运行 驱动程序代码之间的路径 运行s 在 hadoop jar
命令和 YARN 容器中工作节点上 运行 的执行程序代码。 Hadoop 流式处理有点古怪,因为在正常的 Hadoop 使用中它不是第一个 class 公民。
无论如何,在这种情况下,您遇到的主要问题是登录到集群时默认 Python 是 Conda 发行版和 Python 3.7 版,但默认 [= YARN 环境中生成 mapper/reducer 任务的 46=] 版本实际上是 Python 2.7。这是 Dataproc 中某些遗留兼容性考虑因素的不幸结果。您可以通过入侵 mapper.py 来充当您需要的环境信息的转储,从而在实际操作中看到这一点,例如,尝试 运行 在 SSH 进入您的 Dataproc 集群时执行以下命令:
echo foo > foo.txt
hdfs dfs -mkdir hdfs:///foo
hdfs dfs -put foo.txt hdfs:///foo/foo.txt
echo '#!/bin/bash' > info.sh
echo 'which python' >> info.sh
echo 'python --version 2>&1' >> info.sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-Dmapred.reduce.tasks=0 \
-Dmapred.map.tasks=1 \
-files info.sh \
-input hdfs:///foo \
-output hdfs:///info-output \
-mapper ./info.sh
您的本地环境会显示不同版本的 Python 与 hdfs:///info-output:
上打印的内容
$ bash info.sh
/opt/conda/bin/python
Python 3.7.3
$ hdfs dfs -cat hdfs:///info-output/*
/usr/bin/python
Python 2.7.15+
这意味着您可以让 mapper/reducer Python 2.7 兼容,或者您可以在 shebang 中明确指定 /opt/conda/bin/python
。在我复制您的设置(Dataproc 1.4-ubuntu18 加上 jupyter.sh 初始化操作)的设置中,以下对我有用:
#!/opt/conda/bin/python
"""mapper.py"""
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
#print ('%s\t%s' % (word, 1))
print(f"{word}\t{1}")
和减速器:
#!/opt/conda/bin/python
"""reducer.py"""
from operator import itemgetter
import sys
print_out = lambda x, y: print(f'{x}\t{y}')
current_word = None
current_count = 0
word = None
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
#print("still working")
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
但是,要记住的另一件事是 jupyter.sh 初始化操作有点过时,您应该使用实际的 Dataproc Jupyter Component。在任何情况下,尽管您可能想要 运行 我上面概述的 info.sh
步骤,以确定要在您的 mapper.py 和 [=42= 中使用的相关 Python 环境].
例如,没有 jupyter.sh init 操作的普通 Dataproc 1.4-debian9 实际上将登录默认 Python 为 /opt/conda/default/bin/python
而不是 /opt/conda/bin/python
。 Dataproc 1.3-debian9 映像将 /usr/bin/python
设为默认值 Python 2.7.
我基本上是在尝试 运行 我的第一个 Hadoop MapReduce 例程,我必须使用 Hadoop 和 MapReduce,因为我正在为一个 class 项目这样做。我想将 Python 用于映射器和缩减器,因为我最熟悉这种语言,而且我的同龄人也最熟悉它。我觉得对我来说最简单的设置方法是通过 Google DataProc 实例,所以我也有 运行ning。我将描述我做了什么以及我使用了哪些资源,但我对此比较陌生,可能会遗漏一些东西。
Dataproc 配置
Dataproc 1
Dataproc 2
Dataproc 3
然后,我可以通过 SSH 连接到我的主节点。我将 mapper.py
和 reducer.py
文件存储在 Google Cloud Storage 存储桶中。
Mapper 和 reducer 代码来自 this Micheal Noll blog post,修改为与 Python 3.
一起使用mapper.py:
#!/usr/bin/env python
"""mapper.py"""
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
#print ('%s\t%s' % (word, 1))
print(f"{word}\t{1}")
reducer.py
#!/usr/bin/env python
"""reducer.py"""
from operator import itemgetter
import sys
print_out = lambda x, y: print(f'{x}\t{y}')
current_word = None
current_count = 0
word = None
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
#print("still working")
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
最后,我通过 ssh 进入我的主节点,然后检查我的 python 版本:
hduser@data-604-m:~$ python
Python 3.7.3 (default, Mar 27 2019, 22:11:17)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
我 运行 以下内容(改编自
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
-mapper mapper.py \
-reducer reducer.py \
-input gs://data-604-hadoop/books/pg20417.txt \
-output gs://data-604-hadoop/output
结果如下:
hduser@data-604-m:~$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar -files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py -map
per mapper.py -reducer reducer.py -input gs://data-604-hadoop/books/pg20417.txt -output gs://data-604-hadoop/output
packageJobJar: [] [/usr/lib/hadoop-mapreduce/hadoop-streaming-2.9.2.jar] /tmp/streamjob4601880105330541890.jar tmpDir=null
19/11/12 02:10:46 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:47 INFO client.RMProxy: Connecting to ResourceManager at data-604-m/10.162.0.13:8032
19/11/12 02:10:47 INFO client.AHSProxy: Connecting to Application History server at data-604-m/10.162.0.13:10200
19/11/12 02:10:49 INFO mapred.FileInputFormat: Total input files to process : 1
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: number of splits:15
19/11/12 02:10:49 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher
.enabled
19/11/12 02:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1573523684358_0002
19/11/12 02:10:50 INFO impl.YarnClientImpl: Submitted application application_1573523684358_0002
19/11/12 02:10:50 INFO mapreduce.Job: The url to track the job: http://data-604-m:8088/proxy/application_1573523684358_0002/
19/11/12 02:10:50 INFO mapreduce.Job: Running job: job_1573523684358_0002
19/11/12 02:10:58 INFO mapreduce.Job: Job job_1573523684358_0002 running in uber mode : false
19/11/12 02:10:58 INFO mapreduce.Job: map 0% reduce 0%
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:10 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:12 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:19 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:20 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:24 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:28 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:30 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000004_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:37 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:38 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000003_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:39 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000005_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:40 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000006_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:48 INFO mapreduce.Job: Task Id : attempt_1573523684358_0002_m_000007_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:325)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:538)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:458)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:342)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:177)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:171)
19/11/12 02:11:49 INFO mapreduce.Job: map 80% reduce 0%
19/11/12 02:11:50 INFO mapreduce.Job: map 100% reduce 100%
19/11/12 02:11:50 INFO mapreduce.Job: Job job_1573523684358_0002 failed with state FAILED due to: Task failed task_1573523684358_0002_m_000001
Job failed as tasks failed. failedMaps:1 failedReduces:0
19/11/12 02:11:50 INFO mapreduce.Job: Counters: 14
Job Counters
Failed map tasks=19
Killed map tasks=14
Killed reduce tasks=5
Launched map tasks=22
Other local map tasks=14
Rack-local map tasks=8
Total time spent by all maps in occupied slots (ms)=885928
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=221482
Total vcore-milliseconds taken by all map tasks=221482
Total megabyte-milliseconds taken by all map tasks=453595136
Map-Reduce Framework
CPU time spent (ms)=0
Physical memory (bytes) snapshot=0
Virtual memory (bytes) snapshot=0
19/11/12 02:11:50 ERROR streaming.StreamJob: Job not successful!
Streaming Command Failed!
而且,老实说,我现在不知道该怎么做。我已经花了很多时间在这上面,但我觉得自己就像在一堵砖墙上,因为我不确定哪里出了问题。
我也试过:
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-files gs://data-604-hadoop/mapper.py,gs://data-604-hadoop/reducer.py \
-mapper ./mapper.py \
-reducer ./reducer.py \
-input gs://data-604-hadoop/books/pg20417.txt \
-output gs://data-604-hadoop/output
结果相似。
感谢任何帮助。
更新:
我又尝试了一些东西,但没有成功。
我已经尝试将我的 python 脚本移动到 Hadoop 集群上。然后我用 head -n100 mobydick.txt | ./mapper.py | sort | ./reducer.py
测试了它们并且它们有效。在下面的评论中,我提到我查看了我的 shebang 并进行了更改,但它们也不成功。
这里发生了一些不同的事情,但主要归结为不一定能够假设每个 mapper/reducer 任务的系统环境(运行ning 作为YARN 容器)必须具有与您登录时相同的系统环境 shell。在大多数情况下,许多元素将有意不同(例如 Java class 路径等)。通常,对于基于 Java 的 MapReduce 程序,这会按预期工作,因为您最终会得到类似的环境变量和 运行 驱动程序代码之间的路径 运行s 在 hadoop jar
命令和 YARN 容器中工作节点上 运行 的执行程序代码。 Hadoop 流式处理有点古怪,因为在正常的 Hadoop 使用中它不是第一个 class 公民。
无论如何,在这种情况下,您遇到的主要问题是登录到集群时默认 Python 是 Conda 发行版和 Python 3.7 版,但默认 [= YARN 环境中生成 mapper/reducer 任务的 46=] 版本实际上是 Python 2.7。这是 Dataproc 中某些遗留兼容性考虑因素的不幸结果。您可以通过入侵 mapper.py 来充当您需要的环境信息的转储,从而在实际操作中看到这一点,例如,尝试 运行 在 SSH 进入您的 Dataproc 集群时执行以下命令:
echo foo > foo.txt
hdfs dfs -mkdir hdfs:///foo
hdfs dfs -put foo.txt hdfs:///foo/foo.txt
echo '#!/bin/bash' > info.sh
echo 'which python' >> info.sh
echo 'python --version 2>&1' >> info.sh
hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-Dmapred.reduce.tasks=0 \
-Dmapred.map.tasks=1 \
-files info.sh \
-input hdfs:///foo \
-output hdfs:///info-output \
-mapper ./info.sh
您的本地环境会显示不同版本的 Python 与 hdfs:///info-output:
上打印的内容$ bash info.sh
/opt/conda/bin/python
Python 3.7.3
$ hdfs dfs -cat hdfs:///info-output/*
/usr/bin/python
Python 2.7.15+
这意味着您可以让 mapper/reducer Python 2.7 兼容,或者您可以在 shebang 中明确指定 /opt/conda/bin/python
。在我复制您的设置(Dataproc 1.4-ubuntu18 加上 jupyter.sh 初始化操作)的设置中,以下对我有用:
#!/opt/conda/bin/python
"""mapper.py"""
import sys
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words
words = line.split()
# increase counters
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
#print ('%s\t%s' % (word, 1))
print(f"{word}\t{1}")
和减速器:
#!/opt/conda/bin/python
"""reducer.py"""
from operator import itemgetter
import sys
print_out = lambda x, y: print(f'{x}\t{y}')
current_word = None
current_count = 0
word = None
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split('\t', 1)
# convert count (currently a string) to int
try:
count = int(count)
except ValueError:
# count was not a number, so silently
# ignore/discard this line
continue
#print("still working")
# this IF-switch only works because Hadoop sorts map output
# by key (here: word) before it is passed to the reducer
if current_word == word:
current_count += count
else:
if current_word:
# write result to STDOUT
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
current_count = count
current_word = word
# do not forget to output the last word if needed!
if current_word == word:
#print '%s\t%s' % (current_word, current_count)
print_out(current_word, current_count)
但是,要记住的另一件事是 jupyter.sh 初始化操作有点过时,您应该使用实际的 Dataproc Jupyter Component。在任何情况下,尽管您可能想要 运行 我上面概述的 info.sh
步骤,以确定要在您的 mapper.py 和 [=42= 中使用的相关 Python 环境].
例如,没有 jupyter.sh init 操作的普通 Dataproc 1.4-debian9 实际上将登录默认 Python 为 /opt/conda/default/bin/python
而不是 /opt/conda/bin/python
。 Dataproc 1.3-debian9 映像将 /usr/bin/python
设为默认值 Python 2.7.