Python AWS Elastic MapReduce 中的程序步骤执行失败
Python program in AWS Elastic MapReduce fails in step execution
我正在尝试在 Elastic MapReduce Step Execution 中启动一个 Python 程序。它是一个具有以下参数的 Spark 应用程序:
部署模式:集群
Spark 提交选项:--executor-memory 1g
应用位置:s3n://movielens-20m-dataset/Movie-Similarities-10m.py
参数:260
失败时的操作:终止集群
为什么集群启动失败?
这是我的程序脚本:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt
def loadMovieNames():
movieNames = {}
with open("s3n://movielens-20m-dataset/movies.dat") as f:
for line in f:
fields = line.split("::")
movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
return movieNames
def makePairs((user, ratings)):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return ((movie1, movie2), (rating1, rating2))
def filterDuplicates( (userID, ratings) ):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2
def computeCosineSimilarity(ratingPairs):
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1
numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)
score = 0
if (denominator):
score = (numerator / (float(denominator)))
return (score, numPairs)
conf = SparkConf()
sc = SparkContext(conf = conf)
print "\nLoading movie names..."
nameDict = loadMovieNames()
data = sc.textFile("s3n://movielens-20m-dataset/ratings.dat")
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()
# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):
#change values while working with a sample set
scoreThreshold = 0.97
coOccurenceThreshold = 400
movieID = int(sys.argv[1])
filteredResults = moviePairSimilarities.filter(lambda ((pair,sim)): \
(pair[0] == movieID or pair[1] == movieID) \
and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)
results = filteredResults.map(lambda ((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)
#Write to file
f = open("s3n://movielens-20m-dataset/movie-sims", "w")
f.write("Top 10 similar movies for " + nameDict[movieID])
print "Top 10 similar movies for " + nameDict[movieID]
for result in results:
(sim, pair) = result
# Display the similarity result that isn't the movie we're looking at
similarMovieID = pair[0]
if (similarMovieID == movieID):
similarMovieID = pair[1]
f.write(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))
print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])
f.close()
我得到以下日志:
2016-03-14T08:22:53.984Z INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --executor-memory 1g s3n://movielens-20m-dataset/Movie-Similarities-10m.py 260'
2016-03-14T08:22:53.986Z INFO Environment:
TERM=linux
CONSOLETYPE=serial
SHLVL=5
JAVA_HOME=/etc/alternatives/jre
HADOOP_IDENT_STRING=hadoop
LANGSH_SOURCED=1
XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
HADOOP_ROOT_LOGGER=INFO,DRFA
AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
UPSTART_JOB=rc
MAIL=/var/spool/mail/hadoop
EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
AWS_RDS_HOME=/opt/aws/apitools/rds
PWD=/
HOSTNAME=ip-172-31-23-45
LESS_TERMCAP_se=[0m
LOGNAME=hadoop
UPSTART_INSTANCE=
AWS_PATH=/opt/aws
LESS_TERMCAP_mb=[01;31m
_=/etc/alternatives/jre/bin/java
LESS_TERMCAP_me=[0m
NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
LESS_TERMCAP_md=[01;38;5;208m
runlevel=3
AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
UPSTART_EVENTS=runlevel
HISTSIZE=1000
previous=N
HADOOP_LOGFILE=syslog
PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
EC2_HOME=/opt/aws/apitools/ec2
HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-341AYJAV1HCS5
LESS_TERMCAP_ue=[0m
AWS_ELB_HOME=/opt/aws/apitools/elb
RUNLEVEL=3
USER=hadoop
HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-341AYJAV1HCS5/tmp
PREVLEVEL=N
HOME=/home/hadoop
HISTCONTROL=ignoredups
LESSOPEN=||/usr/bin/lesspipe.sh %s
AWS_DEFAULT_REGION=eu-central-1
LANG=en_US.UTF-8
LESS_TERMCAP_us=[04;38;5;111m
2016-03-14T08:22:53.986Z INFO redirectOutput to /mnt/var/log/hadoop/steps/s-341AYJAV1HCS5/stdout
2016-03-14T08:22:53.986Z INFO redirectError to /mnt/var/log/hadoop/steps/s-341AYJAV1HCS5/stderr
2016-03-14T08:22:53.986Z INFO Working dir /mnt/var/lib/hadoop/steps/s-341AYJAV1HCS5
2016-03-14T08:22:54.022Z INFO ProcessRunner started child process 6515 :
hadoop 6515 2312 0 08:22 ? 00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --executor-memory 1g s3n://movielens-20m-dataset/Movie-Similarities-10m.py 260
2016-03-14T08:22:54.024Z INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
2016-03-14T08:23:44.029Z INFO waitProcessCompletion ended with exit code 1 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
2016-03-14T08:23:44.029Z INFO total process run time: 50 seconds
2016-03-14T08:23:44.155Z INFO Step created jobs:
2016-03-14T08:23:44.155Z WARN Step failed with exitCode 1 and took 50 seconds
通过 SSH 连接到集群并从终端应用程序启动程序 "solves" 问题。 movies.dat 文件也需要在集群本地文件系统上。
我正在尝试在 Elastic MapReduce Step Execution 中启动一个 Python 程序。它是一个具有以下参数的 Spark 应用程序: 部署模式:集群 Spark 提交选项:--executor-memory 1g 应用位置:s3n://movielens-20m-dataset/Movie-Similarities-10m.py 参数:260 失败时的操作:终止集群
为什么集群启动失败?
这是我的程序脚本:
import sys
from pyspark import SparkConf, SparkContext
from math import sqrt
def loadMovieNames():
movieNames = {}
with open("s3n://movielens-20m-dataset/movies.dat") as f:
for line in f:
fields = line.split("::")
movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
return movieNames
def makePairs((user, ratings)):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return ((movie1, movie2), (rating1, rating2))
def filterDuplicates( (userID, ratings) ):
(movie1, rating1) = ratings[0]
(movie2, rating2) = ratings[1]
return movie1 < movie2
def computeCosineSimilarity(ratingPairs):
numPairs = 0
sum_xx = sum_yy = sum_xy = 0
for ratingX, ratingY in ratingPairs:
sum_xx += ratingX * ratingX
sum_yy += ratingY * ratingY
sum_xy += ratingX * ratingY
numPairs += 1
numerator = sum_xy
denominator = sqrt(sum_xx) * sqrt(sum_yy)
score = 0
if (denominator):
score = (numerator / (float(denominator)))
return (score, numPairs)
conf = SparkConf()
sc = SparkContext(conf = conf)
print "\nLoading movie names..."
nameDict = loadMovieNames()
data = sc.textFile("s3n://movielens-20m-dataset/ratings.dat")
# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))
# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)
# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))
# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)
# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)
# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()
# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()
# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):
#change values while working with a sample set
scoreThreshold = 0.97
coOccurenceThreshold = 400
movieID = int(sys.argv[1])
filteredResults = moviePairSimilarities.filter(lambda ((pair,sim)): \
(pair[0] == movieID or pair[1] == movieID) \
and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)
results = filteredResults.map(lambda ((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)
#Write to file
f = open("s3n://movielens-20m-dataset/movie-sims", "w")
f.write("Top 10 similar movies for " + nameDict[movieID])
print "Top 10 similar movies for " + nameDict[movieID]
for result in results:
(sim, pair) = result
# Display the similarity result that isn't the movie we're looking at
similarMovieID = pair[0]
if (similarMovieID == movieID):
similarMovieID = pair[1]
f.write(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))
print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])
f.close()
我得到以下日志:
2016-03-14T08:22:53.984Z INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --executor-memory 1g s3n://movielens-20m-dataset/Movie-Similarities-10m.py 260'
2016-03-14T08:22:53.986Z INFO Environment:
TERM=linux
CONSOLETYPE=serial
SHLVL=5
JAVA_HOME=/etc/alternatives/jre
HADOOP_IDENT_STRING=hadoop
LANGSH_SOURCED=1
XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
HADOOP_ROOT_LOGGER=INFO,DRFA
AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
UPSTART_JOB=rc
MAIL=/var/spool/mail/hadoop
EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
AWS_RDS_HOME=/opt/aws/apitools/rds
PWD=/
HOSTNAME=ip-172-31-23-45
LESS_TERMCAP_se=[0m
LOGNAME=hadoop
UPSTART_INSTANCE=
AWS_PATH=/opt/aws
LESS_TERMCAP_mb=[01;31m
_=/etc/alternatives/jre/bin/java
LESS_TERMCAP_me=[0m
NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
LESS_TERMCAP_md=[01;38;5;208m
runlevel=3
AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
UPSTART_EVENTS=runlevel
HISTSIZE=1000
previous=N
HADOOP_LOGFILE=syslog
PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
EC2_HOME=/opt/aws/apitools/ec2
HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-341AYJAV1HCS5
LESS_TERMCAP_ue=[0m
AWS_ELB_HOME=/opt/aws/apitools/elb
RUNLEVEL=3
USER=hadoop
HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-341AYJAV1HCS5/tmp
PREVLEVEL=N
HOME=/home/hadoop
HISTCONTROL=ignoredups
LESSOPEN=||/usr/bin/lesspipe.sh %s
AWS_DEFAULT_REGION=eu-central-1
LANG=en_US.UTF-8
LESS_TERMCAP_us=[04;38;5;111m
2016-03-14T08:22:53.986Z INFO redirectOutput to /mnt/var/log/hadoop/steps/s-341AYJAV1HCS5/stdout
2016-03-14T08:22:53.986Z INFO redirectError to /mnt/var/log/hadoop/steps/s-341AYJAV1HCS5/stderr
2016-03-14T08:22:53.986Z INFO Working dir /mnt/var/lib/hadoop/steps/s-341AYJAV1HCS5
2016-03-14T08:22:54.022Z INFO ProcessRunner started child process 6515 :
hadoop 6515 2312 0 08:22 ? 00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --executor-memory 1g s3n://movielens-20m-dataset/Movie-Similarities-10m.py 260
2016-03-14T08:22:54.024Z INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
2016-03-14T08:23:44.029Z INFO waitProcessCompletion ended with exit code 1 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
2016-03-14T08:23:44.029Z INFO total process run time: 50 seconds
2016-03-14T08:23:44.155Z INFO Step created jobs:
2016-03-14T08:23:44.155Z WARN Step failed with exitCode 1 and took 50 seconds
通过 SSH 连接到集群并从终端应用程序启动程序 "solves" 问题。 movies.dat 文件也需要在集群本地文件系统上。