RDD 访问另一个 RDD 中的值
RDD accessing values in another RDD
我有一个 RDD
需要从另一个 RDD
访问数据。但是,我总是收到 Task not Serializable
错误。我已经扩展了 Serializable
Class 但是它没有用。密码是:
val oldError = rddOfRatings.aggregate(0.0)((accum, rating) =>
accum + calcError(rating.rating,
us.lookup(rating.user)(0),
it.lookup(rating.product)(0)).abs, _+_ ) / rddSize
其中 us
、it
和 rddOfRatings
是其他 RDD
的。我不明白的是,如果 RDD
是不可变的,那么为什么它不允许我从另一个 RDD
中访问 RDD
?问题似乎出在 us
和 it
上,因为当我为本地 collection 删除它们时它工作正常。
谢谢。
RDD 确实不可序列化,因为它们必须捕获变量(例如 SparkContext)。为了解决这个问题,将三个 RDD 连接在一起,您将在累加器闭包中拥有所有必要的值。
rdd.lookup
1 是一项昂贵的操作,即使可以,您也可能不想这样做。
此外,"serializing" RDD 没有意义,因为 RDD 只是对数据的引用,而不是数据本身。
此处采用的方法可能取决于这些数据集的大小。如果 us
和 it
RDD 的大小与 rddOfRatings
大致相同(这就是它的样子,考虑到预期的查找),最好的方法是事先加入它们。
// 请注意,我不知道您的集合的实际结构,所以以此为例
val ratingErrorByUser = us.map(u => (u.id, u.error))
val ratingErrorByProduct = it.map(i=> (i.id, i.error))
val ratingsBykey = rddOfRatings.map(r=> (r.user, (r.product, r.rating)))
val ratingsWithUserError = ratingsByKey.join(ratingErrorByUser)
val ratingsWithProductError = ratingsWithUserError.map{case (userId, ((prodId, rating),userErr))} => (prodId,(rating, userErr))}
val allErrors = ratingsWithProductError.join(ratingErrorByProduct)
val totalErr = allErrors.map{case (prodId,((rating, userErr),prodErr)) => calcError(userErr, math.abs(prodErr), rating)}.reduce(_+_)
val total = totalErr / rddOfRatings.count
可能会容易得多
1 如果查找是必须的(在这种情况下看起来不像!),看看 Spark IndexedRdd
我有一个 RDD
需要从另一个 RDD
访问数据。但是,我总是收到 Task not Serializable
错误。我已经扩展了 Serializable
Class 但是它没有用。密码是:
val oldError = rddOfRatings.aggregate(0.0)((accum, rating) =>
accum + calcError(rating.rating,
us.lookup(rating.user)(0),
it.lookup(rating.product)(0)).abs, _+_ ) / rddSize
其中 us
、it
和 rddOfRatings
是其他 RDD
的。我不明白的是,如果 RDD
是不可变的,那么为什么它不允许我从另一个 RDD
中访问 RDD
?问题似乎出在 us
和 it
上,因为当我为本地 collection 删除它们时它工作正常。
谢谢。
RDD 确实不可序列化,因为它们必须捕获变量(例如 SparkContext)。为了解决这个问题,将三个 RDD 连接在一起,您将在累加器闭包中拥有所有必要的值。
rdd.lookup
1 是一项昂贵的操作,即使可以,您也可能不想这样做。
此外,"serializing" RDD 没有意义,因为 RDD 只是对数据的引用,而不是数据本身。
此处采用的方法可能取决于这些数据集的大小。如果 us
和 it
RDD 的大小与 rddOfRatings
大致相同(这就是它的样子,考虑到预期的查找),最好的方法是事先加入它们。
// 请注意,我不知道您的集合的实际结构,所以以此为例
val ratingErrorByUser = us.map(u => (u.id, u.error))
val ratingErrorByProduct = it.map(i=> (i.id, i.error))
val ratingsBykey = rddOfRatings.map(r=> (r.user, (r.product, r.rating)))
val ratingsWithUserError = ratingsByKey.join(ratingErrorByUser)
val ratingsWithProductError = ratingsWithUserError.map{case (userId, ((prodId, rating),userErr))} => (prodId,(rating, userErr))}
val allErrors = ratingsWithProductError.join(ratingErrorByProduct)
val totalErr = allErrors.map{case (prodId,((rating, userErr),prodErr)) => calcError(userErr, math.abs(prodErr), rating)}.reduce(_+_)
val total = totalErr / rddOfRatings.count
可能会容易得多
1 如果查找是必须的(在这种情况下看起来不像!),看看 Spark IndexedRdd