RDD 访问另一个 RDD 中的值

RDD accessing values in another RDD

我有一个 RDD 需要从另一个 RDD 访问数据。但是,我总是收到 Task not Serializable 错误。我已经扩展了 Serializable Class 但是它没有用。密码是:

val oldError = rddOfRatings.aggregate(0.0)((accum, rating) =>
accum + calcError(rating.rating,
us.lookup(rating.user)(0),
it.lookup(rating.product)(0)).abs, _+_ ) / rddSize

其中 usitrddOfRatings 是其他 RDD 的。我不明白的是,如果 RDD 是不可变的,那么为什么它不允许我从另一个 RDD 中访问 RDD?问题似乎出在 usit 上,因为当我为本地 collection 删除它们时它工作正常。

谢谢。

RDD 确实不可序列化,因为它们必须捕获变量(例如 SparkContext)。为了解决这个问题,将三个 RDD 连接在一起,您将在累加器闭包中拥有所有必要的值。

rdd.lookup1 是一项昂贵的操作,即使可以,您也可能不想这样做。

此外,"serializing" RDD 没有意义,因为 RDD 只是对数据的引用,而不是数据本身。

此处采用的方法可能取决于这些数据集的大小。如果 usit RDD 的大小与 rddOfRatings 大致相同(这就是它的样子,考虑到预期的查找),最好的方法是事先加入它们。

// 请注意,我不知道您的集合的实际结构,所以以此为例

val ratingErrorByUser = us.map(u => (u.id, u.error))
val ratingErrorByProduct = it.map(i=> (i.id, i.error)) 
val ratingsBykey = rddOfRatings.map(r=> (r.user, (r.product, r.rating)))
val ratingsWithUserError = ratingsByKey.join(ratingErrorByUser)
val ratingsWithProductError = ratingsWithUserError.map{case (userId, ((prodId, rating),userErr))} => (prodId,(rating, userErr))}
val allErrors = ratingsWithProductError.join(ratingErrorByProduct)
val totalErr = allErrors.map{case (prodId,((rating, userErr),prodErr)) => calcError(userErr, math.abs(prodErr), rating)}.reduce(_+_)
val total = totalErr / rddOfRatings.count

使用 Spark DataFrame API

可能会容易得多

1 如果查找是必须的(在这种情况下看起来不像!),看看 Spark IndexedRdd