PySpark 连接两个 RDD 结果为空 RDD
PySpark join two RDD results in an empty RDD
我是 Spark 新手,正在尝试在我的数据集上编辑和应用此电影推荐教程 (https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html)。但它一直抛出此错误:
ValueError: Can not reduce() empty RDD
这是计算模型的均方根误差的函数:
def computeRmse(model, data, n):
"""
Compute RMSE (Root Mean Squared Error).
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
print predictions.count()
print predictions.first()
print "predictions above"
print data.count()
print data.first()
print "validation data above"
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
#LINE56
.join(data.map(lambda line: line.split(‘,’) ).map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
print predictionsAndRatings.count()
print "predictions And Ratings above"
#LINE63
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
模型 = ALS.train(训练、等级、numIter、lambda)。数据是验证数据集。
训练和验证集最初来自 ratings.txt 文件,格式为:userID,productID,rating,ratingopID
这些是部分输出:
879
...
Rating(user=0, product=656, rating=4.122132631144641)
predictions above
...
1164
...
(u'640085', u'1590', u'5')
validation data above
...
16/08/26 12:47:18 INFO DAGScheduler: Registering RDD 259 (join at /path/myapp/MyappALS.py:56)
16/08/26 12:47:18 INFO DAGScheduler: Got job 20 (count at /path/myapp/MyappALS.py:59) with 12 output partitions
16/08/26 12:47:18 INFO DAGScheduler: Final stage: ResultStage 238 (count at /path/myapp/MyappALS.py:59)
16/08/26 12:47:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Submitting ShuffleMapStage 237 (PairwiseRDD[259] at join at /path/myapp/MyappALS.py:56), which has no missing parents
....
0
predictions And Ratings above
...
Traceback (most recent call last):
File "/path/myapp/MyappALS.py", line 130, in <module>
validationRmse = computeRmse(model, validation, numValidation)
File "/path/myapp/MyappALS.py", line 63, in computeRmse
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
File "/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 805, in reduce
ValueError: Can not reduce() empty RDD
所以从 count() 我确定初始 RDD 不为空。
比 INFO log Registering RDD 259 (join at /path/myapp/MyappALS.py:56)
是否意味着加入作业已启动?
我是不是遗漏了什么?
谢谢。
当我将 int() 添加到 :
时,该错误消失了
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((int(x[0]), int(x[1])), int(x[2])))) \
.values()
我们认为这是因为 pediction 是从给出 tupple 的方法 predictAll 输出的,但是其他数据是由算法手动解析的
我是 Spark 新手,正在尝试在我的数据集上编辑和应用此电影推荐教程 (https://databricks-training.s3.amazonaws.com/movie-recommendation-with-mllib.html)。但它一直抛出此错误:
ValueError: Can not reduce() empty RDD
这是计算模型的均方根误差的函数:
def computeRmse(model, data, n):
"""
Compute RMSE (Root Mean Squared Error).
"""
predictions = model.predictAll(data.map(lambda x: (x[0], x[1])))
print predictions.count()
print predictions.first()
print "predictions above"
print data.count()
print data.first()
print "validation data above"
predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
#LINE56
.join(data.map(lambda line: line.split(‘,’) ).map(lambda x: ((x[0], x[1]), x[2]))) \
.values()
print predictionsAndRatings.count()
print "predictions And Ratings above"
#LINE63
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
模型 = ALS.train(训练、等级、numIter、lambda)。数据是验证数据集。 训练和验证集最初来自 ratings.txt 文件,格式为:userID,productID,rating,ratingopID
这些是部分输出:
879
...
Rating(user=0, product=656, rating=4.122132631144641)
predictions above
...
1164
...
(u'640085', u'1590', u'5')
validation data above
...
16/08/26 12:47:18 INFO DAGScheduler: Registering RDD 259 (join at /path/myapp/MyappALS.py:56)
16/08/26 12:47:18 INFO DAGScheduler: Got job 20 (count at /path/myapp/MyappALS.py:59) with 12 output partitions
16/08/26 12:47:18 INFO DAGScheduler: Final stage: ResultStage 238 (count at /path/myapp/MyappALS.py:59)
16/08/26 12:47:18 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 237)
16/08/26 12:47:18 INFO DAGScheduler: Submitting ShuffleMapStage 237 (PairwiseRDD[259] at join at /path/myapp/MyappALS.py:56), which has no missing parents
....
0
predictions And Ratings above
...
Traceback (most recent call last):
File "/path/myapp/MyappALS.py", line 130, in <module>
validationRmse = computeRmse(model, validation, numValidation)
File "/path/myapp/MyappALS.py", line 63, in computeRmse
return sqrt(predictionsAndRatings.map(lambda x: (x[0] - x[1]) ** 2).reduce(add) / float(n))
File "/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 805, in reduce
ValueError: Can not reduce() empty RDD
所以从 count() 我确定初始 RDD 不为空。
比 INFO log Registering RDD 259 (join at /path/myapp/MyappALS.py:56)
是否意味着加入作业已启动?
我是不是遗漏了什么? 谢谢。
当我将 int() 添加到 :
时,该错误消失了predictionsAndRatings = predictions.map(lambda x: ((x[0], x[1]), x[2])) \
.join(data.map(lambda x: ((int(x[0]), int(x[1])), int(x[2])))) \
.values()
我们认为这是因为 pediction 是从给出 tupple 的方法 predictAll 输出的,但是其他数据是由算法手动解析的