运行 Mahout RowSimilarity 推荐器在 MongoDB 数据上
Run Mahout RowSimilarity recommender on MongoDB data
我已经设法 运行 Mahout rowsimilarity 在以下格式的平面文件上:
item-id tag1 tag-2 tag3
这必须通过 cli 运行 并且输出再次是平面文件。我想让它从 MongoDB 读取数据(也可以使用其他数据库),然后将输出转储到数据库,然后可以从我们的系统中选择。
我研究了这几天,发现以下内容:
- 将必须编写实现 RowSimilarity 的 Scala 代码
- 向它传递一个 IndexedDataSet 对象来处理数据
- 将输出转换为所需格式(json/csv)
我还没有弄清楚的是如何将数据从 DB 导入到 IndexedDataSet。此外,我已经阅读了有关 RDD 格式的信息,但仍然无法弄清楚如何将 json 数据转换为 RowSimilarity 代码可以使用的 RDD。
tl;dr:如何转换 MongoDB 数据以便可以通过 mahout/spark rowsimilarity 处理?
Edit1:我从这个 link 中找到了一些将 Mongodata 转换为 RDD 的代码:https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage#scala-example
现在我需要帮助将其转换为 IndexedDataset,以便将其传递给 SimilarityAnalysis.rowSimilarityIDS。
tl;dr:如何将 RDD 转换为 IndexedDataset
答案如下:
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.math.indexeddataset.Schema
import org.apache.mahout.sparkbindings
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.RDD
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
implicit val mc = sparkbindings.mahoutSparkContext(masterUrl = "local", appName = "RowSimilarity")
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://hostname:27017/db.collection")
val documents: RDD[(Object, BSONObject)] = mc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val documents_Array: RDD[(String, Array[String])] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-").mkString(" "))
)
)
val new_doc: RDD[(String, String)] = documents_Array.flatMapValues(x => x)
val myIDs = IndexedDatasetSpark(new_doc)(mc)
val readWriteSchema = new Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"omitScore" -> false,
"elementDelim" -> " "
)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://hadoop:9000/mongo-hadoop-rowsimilarity", readWriteSchema)(mc)
}
build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
我已经使用 mongo-hadoop 从 Mongo 获取数据并使用它。由于我的数据有一个数组,我不得不使用 flatMapValues 来展平它,然后传递给 IDS 以获得正确的输出。
PS:我在这里而不是 linked question 发布了答案,因为这个问答涵盖了获取数据和处理数据的全部范围。
我已经设法 运行 Mahout rowsimilarity 在以下格式的平面文件上:
item-id tag1 tag-2 tag3
这必须通过 cli 运行 并且输出再次是平面文件。我想让它从 MongoDB 读取数据(也可以使用其他数据库),然后将输出转储到数据库,然后可以从我们的系统中选择。
我研究了这几天,发现以下内容:
- 将必须编写实现 RowSimilarity 的 Scala 代码
- 向它传递一个 IndexedDataSet 对象来处理数据
- 将输出转换为所需格式(json/csv)
我还没有弄清楚的是如何将数据从 DB 导入到 IndexedDataSet。此外,我已经阅读了有关 RDD 格式的信息,但仍然无法弄清楚如何将 json 数据转换为 RowSimilarity 代码可以使用的 RDD。
tl;dr:如何转换 MongoDB 数据以便可以通过 mahout/spark rowsimilarity 处理?
Edit1:我从这个 link 中找到了一些将 Mongodata 转换为 RDD 的代码:https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage#scala-example
现在我需要帮助将其转换为 IndexedDataset,以便将其传递给 SimilarityAnalysis.rowSimilarityIDS。
tl;dr:如何将 RDD 转换为 IndexedDataset
答案如下:
import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.math.indexeddataset.Schema
import org.apache.mahout.sparkbindings
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.RDD
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat
object SparkExample extends App {
implicit val mc = sparkbindings.mahoutSparkContext(masterUrl = "local", appName = "RowSimilarity")
val mongoConfig = new Configuration()
mongoConfig.set("mongo.input.uri", "mongodb://hostname:27017/db.collection")
val documents: RDD[(Object, BSONObject)] = mc.newAPIHadoopRDD(
mongoConfig,
classOf[MongoInputFormat],
classOf[Object],
classOf[BSONObject]
)
val documents_Array: RDD[(String, Array[String])] = documents.map(
doc1 => (
doc1._2.get("product_id").toString(),
doc1._2.get("product_attribute_value").toString().replace("[ \"", "").replace("\"]", "").split("\" , \"").map(value => value.toLowerCase.replace(" ", "-").mkString(" "))
)
)
val new_doc: RDD[(String, String)] = documents_Array.flatMapValues(x => x)
val myIDs = IndexedDatasetSpark(new_doc)(mc)
val readWriteSchema = new Schema(
"rowKeyDelim" -> "\t",
"columnIdStrengthDelim" -> ":",
"omitScore" -> false,
"elementDelim" -> " "
)
SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://hadoop:9000/mongo-hadoop-rowsimilarity", readWriteSchema)(mc)
}
build.sbt:
name := "scala-mongo"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.mongodb" %% "casbah" % "3.1.1"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
libraryDependencies += "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.2"
libraryDependencies ++= Seq(
"org.apache.hadoop" % "hadoop-client" % "2.6.0" exclude("javax.servlet", "servlet-api") exclude ("com.sun.jmx", "jmxri") exclude ("com.sun.jdmk", "jmxtools") exclude ("javax.jms", "jms") exclude ("org.slf4j", "slf4j-log4j12") exclude("hsqldb","hsqldb"),
"org.scalatest" % "scalatest_2.10" % "1.9.2" % "test"
)
libraryDependencies += "org.apache.mahout" % "mahout-math-scala_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-spark_2.10" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-math" % "0.11.2"
libraryDependencies += "org.apache.mahout" % "mahout-hdfs" % "0.11.2"
resolvers += "typesafe repo" at " http://repo.typesafe.com/typesafe/releases/"
resolvers += Resolver.mavenLocal
我已经使用 mongo-hadoop 从 Mongo 获取数据并使用它。由于我的数据有一个数组,我不得不使用 flatMapValues 来展平它,然后传递给 IDS 以获得正确的输出。
PS:我在这里而不是 linked question 发布了答案,因为这个问答涵盖了获取数据和处理数据的全部范围。