Python AWS Elastic MapReduce 中的程序步骤执行失败

Python program in AWS Elastic MapReduce fails in step execution

我正在尝试在 Elastic MapReduce Step Execution 中启动一个 Python 程序。它是一个具有以下参数的 Spark 应用程序: 部署模式:集群 Spark 提交选项:--executor-memory 1g 应用位置:s3n://movielens-20m-dataset/Movie-Similarities-10m.py 参数:260 失败时的操作:终止集群

为什么集群启动失败?

这是我的程序脚本:

import sys
from pyspark import SparkConf, SparkContext
from math import sqrt

def loadMovieNames():
    movieNames = {}
    with open("s3n://movielens-20m-dataset/movies.dat") as f:
        for line in f:
            fields = line.split("::")
            movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
    return movieNames

def makePairs((user, ratings)):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return ((movie1, movie2), (rating1, rating2))

def filterDuplicates( (userID, ratings) ):
    (movie1, rating1) = ratings[0]
    (movie2, rating2) = ratings[1]
    return movie1 < movie2

def computeCosineSimilarity(ratingPairs):
    numPairs = 0
    sum_xx = sum_yy = sum_xy = 0
    for ratingX, ratingY in ratingPairs:
        sum_xx += ratingX * ratingX
        sum_yy += ratingY * ratingY
        sum_xy += ratingX * ratingY
        numPairs += 1

    numerator = sum_xy
    denominator = sqrt(sum_xx) * sqrt(sum_yy)

    score = 0
    if (denominator):
        score = (numerator / (float(denominator)))

    return (score, numPairs)


conf = SparkConf()
sc = SparkContext(conf = conf)


print "\nLoading movie names..."
nameDict = loadMovieNames()

data = sc.textFile("s3n://movielens-20m-dataset/ratings.dat")

# Map ratings to key / value pairs: user ID => movie ID, rating
ratings = data.map(lambda l: l.split("::")).map(lambda l: (int(l[0]), (int(l[1]), float(l[2]))))

# Emit every movie rated together by the same user.
# Self-join to find every combination.
ratingsPartitioned = ratings.partitionBy(100)
joinedRatings = ratingsPartitioned.join(ratingsPartitioned)

# At this point our RDD consists of userID => ((movieID, rating), (movieID, rating))

# Filter out duplicate pairs
uniqueJoinedRatings = joinedRatings.filter(filterDuplicates)

# Now key by (movie1, movie2) pairs.
moviePairs = uniqueJoinedRatings.map(makePairs).partitionBy(100)

# We now have (movie1, movie2) => (rating1, rating2)
# Now collect all ratings for each movie pair and compute similarity
moviePairRatings = moviePairs.groupByKey()

# We now have (movie1, movie2) = > (rating1, rating2), (rating1, rating2) ...
# Can now compute similarities.
moviePairSimilarities = moviePairRatings.mapValues(computeCosineSimilarity).persist()

# Extract similarities for the movie we care about that are "good".
if (len(sys.argv) > 1):

    #change values while working with a sample set
    scoreThreshold = 0.97
    coOccurenceThreshold = 400

    movieID = int(sys.argv[1])

    filteredResults = moviePairSimilarities.filter(lambda ((pair,sim)): \
        (pair[0] == movieID or pair[1] == movieID) \
        and sim[0] > scoreThreshold and sim[1] > coOccurenceThreshold)

    results = filteredResults.map(lambda ((pair,sim)): (sim, pair)).sortByKey(ascending = False).take(10)

    #Write to file
    f = open("s3n://movielens-20m-dataset/movie-sims", "w")

    f.write("Top 10 similar movies for " + nameDict[movieID])
    print "Top 10 similar movies for " + nameDict[movieID]
    for result in results:
        (sim, pair) = result
        # Display the similarity result that isn't the movie we're looking at
        similarMovieID = pair[0]
        if (similarMovieID == movieID):
            similarMovieID = pair[1]
        f.write(nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1]))
        print nameDict[similarMovieID] + "\tscore: " + str(sim[0]) + "\tstrength: " + str(sim[1])
    f.close()

我得到以下日志:

2016-03-14T08:22:53.984Z INFO startExec 'hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --executor-memory 1g s3n://movielens-20m-dataset/Movie-Similarities-10m.py 260'
2016-03-14T08:22:53.986Z INFO Environment:
  TERM=linux
  CONSOLETYPE=serial
  SHLVL=5
  JAVA_HOME=/etc/alternatives/jre
  HADOOP_IDENT_STRING=hadoop
  LANGSH_SOURCED=1
  XFILESEARCHPATH=/usr/dt/app-defaults/%L/Dt
  HADOOP_ROOT_LOGGER=INFO,DRFA
  AWS_CLOUDWATCH_HOME=/opt/aws/apitools/mon
  UPSTART_JOB=rc
  MAIL=/var/spool/mail/hadoop
  EC2_AMITOOL_HOME=/opt/aws/amitools/ec2
  AWS_RDS_HOME=/opt/aws/apitools/rds
  PWD=/
  HOSTNAME=ip-172-31-23-45
  LESS_TERMCAP_se=[0m
  LOGNAME=hadoop
  UPSTART_INSTANCE=
  AWS_PATH=/opt/aws
  LESS_TERMCAP_mb=[01;31m
  _=/etc/alternatives/jre/bin/java
  LESS_TERMCAP_me=[0m
  NLSPATH=/usr/dt/lib/nls/msg/%L/%N.cat
  LESS_TERMCAP_md=[01;38;5;208m
  runlevel=3
  AWS_AUTO_SCALING_HOME=/opt/aws/apitools/as
  UPSTART_EVENTS=runlevel
  HISTSIZE=1000
  previous=N
  HADOOP_LOGFILE=syslog
  PATH=/sbin:/usr/sbin:/bin:/usr/bin:/usr/local/sbin:/opt/aws/bin
  EC2_HOME=/opt/aws/apitools/ec2
  HADOOP_LOG_DIR=/mnt/var/log/hadoop/steps/s-341AYJAV1HCS5
  LESS_TERMCAP_ue=[0m
  AWS_ELB_HOME=/opt/aws/apitools/elb
  RUNLEVEL=3
  USER=hadoop
  HADOOP_CLIENT_OPTS=-Djava.io.tmpdir=/mnt/var/lib/hadoop/steps/s-341AYJAV1HCS5/tmp
  PREVLEVEL=N
  HOME=/home/hadoop
  HISTCONTROL=ignoredups
  LESSOPEN=||/usr/bin/lesspipe.sh %s
  AWS_DEFAULT_REGION=eu-central-1
  LANG=en_US.UTF-8
  LESS_TERMCAP_us=[04;38;5;111m
2016-03-14T08:22:53.986Z INFO redirectOutput to /mnt/var/log/hadoop/steps/s-341AYJAV1HCS5/stdout
2016-03-14T08:22:53.986Z INFO redirectError to /mnt/var/log/hadoop/steps/s-341AYJAV1HCS5/stderr
2016-03-14T08:22:53.986Z INFO Working dir /mnt/var/lib/hadoop/steps/s-341AYJAV1HCS5
2016-03-14T08:22:54.022Z INFO ProcessRunner started child process 6515 :
hadoop    6515  2312  0 08:22 ?        00:00:00 bash /usr/lib/hadoop/bin/hadoop jar /var/lib/aws/emr/step-runner/hadoop-jars/command-runner.jar spark-submit --deploy-mode cluster --executor-memory 1g s3n://movielens-20m-dataset/Movie-Similarities-10m.py 260
2016-03-14T08:22:54.024Z INFO Synchronously wait child process to complete : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
2016-03-14T08:23:44.029Z INFO waitProcessCompletion ended with exit code 1 : hadoop jar /var/lib/aws/emr/step-runner/hadoop-...
2016-03-14T08:23:44.029Z INFO total process run time: 50 seconds
2016-03-14T08:23:44.155Z INFO Step created jobs: 
2016-03-14T08:23:44.155Z WARN Step failed with exitCode 1 and took 50 seconds

通过 SSH 连接到集群并从终端应用程序启动程序 "solves" 问题。 movies.dat 文件也需要在集群本地文件系统上。