如果用户 ID 是字符串而不是连续的整数,如何使用 mllib.recommendation?

How to use mllib.recommendation if the user ids are string instead of contiguous integers?

我想使用 Spark 的 mllib.recommendation 库构建推荐系统原型。但是,我拥有的用户数据的格式是以下格式:



看起来必须在真实用户 ID 和 Spark 使用的数字 ID 之间进行某种转换。但是我应该怎么做呢?

Spark 并不真正需要数字 ID,它只需要一些唯一的值,但为了实现他们选择了 Int。


  case class MyRating(userId: String, product: Int, rating: Double)

  val data: RDD[MyRating] = ???

  // Assign unique Long id for each userId
  val userIdToInt: RDD[(String, Long)] = 

  // Reverse mapping from generated id to original
  val reverseMapping: RDD[(Long, String)]
    userIdToInt map { case (l, r) => (r, l) }

  // Depends on data size, maybe too big to keep
  // on single machine
  val map: Map[String, Int] = 

  // Transform to MLLib rating
  val rating: RDD[Rating] = data.map { r =>
    Rating(userIdToInt.lookup(r.userId).head.toInt, r.product, r.rating)
    // -- or
    Rating(map(r.userId), r.product, r.rating)

  // ... train model

  // ... get back to MyRating userId from Int

  val someUserId: String = reverseMapping.lookup(123).head

您也可以尝试 'data.zipWithUniqueId()',但我不确定在这种情况下,即使数据集很小,.toInt 转换是否安全。

上述解决方案可能并不像我发现的那样总是有效。 Spark 无法从其他 RDD 中执行 RDD 转换。错误输出:

org.apache.spark.SparkException: RDD transformations and actions can only be enter code hereinvoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

作为一种解决方案,您可以将 userIdToInt RDD 与原始数据 RDD 结合起来,以存储 userId 和 uniqueId 之间的关系。然后稍后你可以再次将结果 RDD 与这个 RDD 连接起来。

// Create RDD with the unique id included
val dataWithUniqueUserId: RDD[(String, Int, Int, Double)] = 
    data.keyBy(_.userId).join(userIdToInt).map(r => 
        (r._2._1.userId, r._2._2.toInt, r._2._1.productId, 1))

您需要 运行 跨用户 ID 的 StringIndexer 以将字符串转换为唯一整数索引。它们不必是连续的。

df 是 (user:String, product,rating)

  val stringindexer = new StringIndexer()
  val modelc = stringindexer.fit(df)
  val  df = modelc.transform(df)

@Ganesh Krishnan 是对的,StringIndexer 解决了这个问题。

from pyspark.ml.feature import OneHotEncoder, StringIndexer
from pyspark.sql import SQLContext
>>> spark = SQLContext(sc)                                                                             
>>> df = spark.createDataFrame(
...     [(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],
...     ["id", "category"])

| id|category|
|  0|       a|
|  1|       b|
|  2|       c|
|  3|       a|
|  4|       a|
|  5|       c|
>>> stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
>>> model = stringIndexer.fit(df)
>>> indexed = model.transform(df)
>>> indexed.show()
| id|category|categoryIndex|
|  0|       a|          0.0|
|  1|       b|          2.0|
|  2|       c|          1.0|
|  3|       a|          0.0|
|  4|       a|          0.0|
|  5|       c|          1.0|

>>> converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")
>>> converted = converter.transform(indexed)
>>> converted.show()
| id|category|categoryIndex|originalCategory|
|  0|       a|          0.0|               a|
|  1|       b|          2.0|               b|
|  2|       c|          1.0|               c|
|  3|       a|          0.0|               a|
|  4|       a|          0.0|               a|
|  5|       c|          1.0|               c|

>>> converted.select("id", "originalCategory").show()
| id|originalCategory|
|  0|               a|
|  1|               b|
|  2|               c|
|  3|               a|
|  4|               a|
|  5|               c|