如何将代码和数据集分发到工作节点上?
How to distribute code and dataset onto worker nodes?
我一直在使用数据集 Movielens(2000 万条记录)并且一直在 Spark MLlib 中使用 collaborative filtering。
我的环境是 Ubuntu VirtualBox 上的 14.4。我有一个主节点和 2 个从节点。我使用了已发布的 Apache Hadoop、Apache Spark、Scala、sbt。代码是用 Scala 编写的。
如何将代码和数据集分发到工作节点上?
import java.lang.Math._
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object trainModel extends App {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("trainModel")
val sc = new SparkContext(conf)
val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
val sqlContext = new SQLContext(sc)
val df = sqlContext
.read
.option("header", "true")
.format("csv")
.load("file:///usr/local/spark/dataset/rating.csv")
val ratings = rawData.map(line => line.split(",").take(3) match {
case Array(userId, movieId, rating) =>
Rating(userId.toInt, movieId.toInt, rating.toFloat)
})
println(s"Number of Ratings in Movie file ${ratings.count()} \n")
val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
//split data into test&train
val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), seed = 12345)
val trainingRatingsRDD = splits(0).cache()
val testRatingsRDD = splits(1).cache()
val numTraining = trainingRatingsRDD.count()
val numTest = testRatingsRDD.count()
println(s"Training: $numTraining, test: $numTest.")
val rank = 10
val lambdas = 0.01
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations)
//Evaluate the model on training data
val userProducts = ratings.map { case Rating(userId, movieId, rating) =>
(userId, movieId)
}
val predictions = model.predict(userProducts).map { case
Rating(userId, movieId, rating) =>
((userId, movieId), rating)
}
val ratesAndPreds = ratings.map { case Rating(userId, movieId, rating) =>
((userId, movieId),
rating)
}.join(predictions)
val meanSquaredError = ratesAndPreds.map { case ((userId, movieId),
(r1, r2)) =>
val err = r1 - r2
err * err
}.mean
println("Mean Squared Error= " + meanSquaredError)
sqrt(meanSquaredError)
val rmse = math.sqrt(meanSquaredError)
println(s" RMSE = $rmse.")
}
1 - 您的数据集最好放入分布式文件系统 - Hadoop HDFS、S3 等
2 - 代码通过 spark-submit
脚本分发,如此处所述 https://spark.apache.org/docs/2.4.3/submitting-applications.html
How to distribute code
当您 spark-submit
一个 Spark 应用程序时,就会发生这种情况。分配可以是每个 CPU core/thread 或执行者。您不必对其进行编码。这就是人们使用 Spark 的原因,因为它应该(几乎)自动发生。
conf.setMaster("local[*]")
也就是说,您使用的线程数与 CPU 内核数一样多。那是本地发行版。
您最好从代码中删除该行并改用 spark-submit --master
。阅读官方文档,尤其是。 Submitting Applications.
...and dataset into worker nodes?
val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
该行说明了 Movielens 数据集 (rating.csv
) 的分布方式。它与 Spark 无关,因为 Spark 使用文件系统上的任何分布。
换句话说,在块大小为 256MB 的 Hadoop HDFS 上 (split),一个两倍于块大小的文件可分为两部分。这是 HDFS 使文件分发和 fault-tolerant.
当 Spark 读取 2-split 文件时,分布式计算(使用 RDD 描述)将使用 2 个分区和 2 个任务。
HDFS 是一个文件系统/存储,因此选择任何位置和 hdfs -put
数据集。将 HDFS 视为您可以远程访问的任何文件系统。使用位置作为 sc.textFile
的输入参数,你就完成了。
我一直在使用数据集 Movielens(2000 万条记录)并且一直在 Spark MLlib 中使用 collaborative filtering。
我的环境是 Ubuntu VirtualBox 上的 14.4。我有一个主节点和 2 个从节点。我使用了已发布的 Apache Hadoop、Apache Spark、Scala、sbt。代码是用 Scala 编写的。
如何将代码和数据集分发到工作节点上?
import java.lang.Math._
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object trainModel extends App {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("trainModel")
val sc = new SparkContext(conf)
val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
val sqlContext = new SQLContext(sc)
val df = sqlContext
.read
.option("header", "true")
.format("csv")
.load("file:///usr/local/spark/dataset/rating.csv")
val ratings = rawData.map(line => line.split(",").take(3) match {
case Array(userId, movieId, rating) =>
Rating(userId.toInt, movieId.toInt, rating.toFloat)
})
println(s"Number of Ratings in Movie file ${ratings.count()} \n")
val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
//split data into test&train
val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), seed = 12345)
val trainingRatingsRDD = splits(0).cache()
val testRatingsRDD = splits(1).cache()
val numTraining = trainingRatingsRDD.count()
val numTest = testRatingsRDD.count()
println(s"Training: $numTraining, test: $numTest.")
val rank = 10
val lambdas = 0.01
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations)
//Evaluate the model on training data
val userProducts = ratings.map { case Rating(userId, movieId, rating) =>
(userId, movieId)
}
val predictions = model.predict(userProducts).map { case
Rating(userId, movieId, rating) =>
((userId, movieId), rating)
}
val ratesAndPreds = ratings.map { case Rating(userId, movieId, rating) =>
((userId, movieId),
rating)
}.join(predictions)
val meanSquaredError = ratesAndPreds.map { case ((userId, movieId),
(r1, r2)) =>
val err = r1 - r2
err * err
}.mean
println("Mean Squared Error= " + meanSquaredError)
sqrt(meanSquaredError)
val rmse = math.sqrt(meanSquaredError)
println(s" RMSE = $rmse.")
}
1 - 您的数据集最好放入分布式文件系统 - Hadoop HDFS、S3 等
2 - 代码通过 spark-submit
脚本分发,如此处所述 https://spark.apache.org/docs/2.4.3/submitting-applications.html
How to distribute code
当您 spark-submit
一个 Spark 应用程序时,就会发生这种情况。分配可以是每个 CPU core/thread 或执行者。您不必对其进行编码。这就是人们使用 Spark 的原因,因为它应该(几乎)自动发生。
conf.setMaster("local[*]")
也就是说,您使用的线程数与 CPU 内核数一样多。那是本地发行版。
您最好从代码中删除该行并改用 spark-submit --master
。阅读官方文档,尤其是。 Submitting Applications.
...and dataset into worker nodes? val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
该行说明了 Movielens 数据集 (rating.csv
) 的分布方式。它与 Spark 无关,因为 Spark 使用文件系统上的任何分布。
换句话说,在块大小为 256MB 的 Hadoop HDFS 上 (split),一个两倍于块大小的文件可分为两部分。这是 HDFS 使文件分发和 fault-tolerant.
当 Spark 读取 2-split 文件时,分布式计算(使用 RDD 描述)将使用 2 个分区和 2 个任务。
HDFS 是一个文件系统/存储,因此选择任何位置和 hdfs -put
数据集。将 HDFS 视为您可以远程访问的任何文件系统。使用位置作为 sc.textFile
的输入参数,你就完成了。