ALS 的 OutOfBoundsException - Flink MLlib

OutOfBoundsException with ALS - Flink MLlib

我正在做一个电影推荐系统,使用此处提供的 MovieLens 数据集: http://grouplens.org/datasets/movielens/

为了计算这个推荐系统,我在scala中使用了Flink的ML库,特别是ALS算法(org.apache.flink.ml.recommendation.ALS)。

我首先将电影的评级映射到 DataSet[(Int, Int, Double)],然后创建 trainingSettestSet(请参见下面的代码)。

我的问题是,当我对整个数据集(所有评级)使用 ALS.fit 函数时没有错误,但如果我只删除一个评级,则拟合函数不会工作了,我不明白为什么。

你有什么想法吗? :)

使用代码:

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double)

PreProcessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)

    val trainingSet : DataSet[(Int, Int, Double)] =
    ratings
     .map(r => (r.userId, r.movieId, r.rating))
     .sortPartition(0, Order.ASCENDING)
     .first(ratings.count().toInt)

    val als = ALS()
     .setIterations(10)
     .setNumFactors(10)
     .setBlocks(150)
     .setTemporaryPath("/tmp/tmpALS")

    val parameters = ParameterMap()
     .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
     .add(ALS.Seed, 42L)

    als.fit(trainingSet, parameters)
  }
}

"But if I just remove only one rating"

val trainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .sortPartition(0, Order.ASCENDING)
    .first((ratings.count()-1).toInt)

错误:

06/19/2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) switched to FAILED

java.lang.ArrayIndexOutOfBoundsException: 5

at org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)

at org.apache.flink.ml.recommendation.ALS$$anon1.coGroup(ALS.scala:635)

at org.apache.flink.runtime.operators.CoGroupDriver.run(CoGroupDriver.java:152)

...

问题是first运算符与Flink的ALS实现的setTemporaryPath参数结合使用。为了理解这个问题,让我快速解释一下阻塞 ALS 算法是如何工作的。

交替最小二乘法的分块实现首先将给定的评分矩阵按用户和项目划分为块。对于这些块,计算路由信息。该路由信息表示哪个 user/item 块分别从哪个 item/user 块接收哪个输入。之后,开始ALS迭代。

由于 Flink 的底层执行引擎是并行流式数据流引擎,它会尝试以流水线方式执行尽可能多的数据流部分。这需要让管道的​​所有操作员同时在线。这具有 Flink 避免具体化中间结果的优点,中间结果可能非常大。缺点是可用内存必须在所有 运行 运算符之间共享。在 ALS 的情况下,单个 DataSet 元素(例如 user/item 块)的大小相当大,这是不需要的。

为了解决这个问题,如果你设置了一个temporaryPath,并不是所有的实现操作符都同时执行。该路径定义了可以存储中间结果的位置。因此,如果您定义了一个临时路径,那么 ALS 首先计算用户块的路由信息​​并将它们写入磁盘,然后计算项目块的路由信息​​并将它们写入磁盘,最后但尤其是它开始 ALS 迭代,从临时路径读取路由信息。

用户和项目块的路由信息​​的计算都依赖于给定的评分数据集。在您的情况下,当您计算用户路由信息时,它将首先读取评级数据集并对其应用 first 运算符。 first 运算符 returns n - 来自基础数据集的任意元素。现在的问题是 Flink 没有存储这个 first 操作的结果来计算 item 路由信息。相反,当您开始计算项目路由信息时,Flink 将从其源开始重新执行数据流。这意味着它从磁盘读取评级数据集并再次对其应用 first 运算符。在许多情况下,与第一个 first 操作的结果相比,这会给您一组不同的评级。所以生成的路由信息​​不一致,ALS失败

您可以通过具体化 first 运算符的结果并将此结果用作 ALS 算法的输入来规避此问题。对象 FlinkMLTools 包含一个方法 persist,它采用 DataSet,将其写入给定路径,然后 returns 一个新的 DataSet 读取刚刚写入的 DataSet。这允许您分解生成的数据流图。

val firstTrainingSet : DataSet[(Int, Int, Double)] =
  ratings
    .map(r => (r.userId, r.movieId, r.rating))
    .first((ratings.count()-1).toInt)

val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")

val als = ALS()
  .setIterations(10)
  .setNumFactors(10)
  .setBlocks(150)
  .setTemporaryPath("/tmp/tmpALS/")

val parameters = ParameterMap()
  .add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
  .add(ALS.Seed, 42L)

als.fit(trainingSet, parameters)

或者,您可以尝试不设置 temporaryPath。然后以流水线方式执行所有步骤(路由信息计算和als迭代)。这意味着用户和项目路由信息计算使用相同的输入数据集,该数据集来自 first 运算符。

Flink 社区目前正在致力于将算子的中间结果保存在内存中。这将允许固定 first 运算符的结果,这样它就不会被计算两次,因此不会由于其不确定性而给出不同的结果。