如何将代码和数据集分发到工作节点上?

How to distribute code and dataset onto worker nodes?

我一直在使用数据集 Movielens(2000 万条记录)并且一直在 Spark MLlib 中使用 collaborative filtering

我的环境是 Ubuntu VirtualBox 上的 14.4。我有一个主节点和 2 个从节点。我使用了已发布的 Apache Hadoop、Apache Spark、Scala、sbt。代码是用 Scala 编写的。

如何将代码和数据集分发到工作节点上?

import java.lang.Math._

import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.recommendation.ALS.Rating
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object trainModel extends App {

  val conf = new SparkConf()
    .setMaster("local[*]")
    .setAppName("trainModel")
  val sc = new SparkContext(conf)

  val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")

  val sqlContext = new SQLContext(sc)
  val df = sqlContext
    .read
    .option("header", "true")
    .format("csv")
    .load("file:///usr/local/spark/dataset/rating.csv")

  val ratings = rawData.map(line => line.split(",").take(3) match {
    case Array(userId, movieId, rating) => 
      Rating(userId.toInt, movieId.toInt, rating.toFloat)
  })
  println(s"Number of Ratings in Movie file ${ratings.count()} \n")

  val ratingsRDD = sc.textFile("file:///usr/local/spark/dataset/rating.csv")
  //split data into test&train
  val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), seed = 12345)
  val trainingRatingsRDD = splits(0).cache()
  val testRatingsRDD = splits(1).cache()
  val numTraining = trainingRatingsRDD.count()
  val numTest = testRatingsRDD.count()
  println(s"Training: $numTraining, test: $numTest.")

  val rank = 10
  val lambdas = 0.01
  val numIterations = 10
  val model = ALS.train(ratings, rank, numIterations)
  //Evaluate the model on training data
  val userProducts = ratings.map { case Rating(userId, movieId, rating) =>
    (userId, movieId)
  }
  val predictions = model.predict(userProducts).map { case
    Rating(userId, movieId, rating) =>
    ((userId, movieId), rating)
  }
  val ratesAndPreds = ratings.map { case Rating(userId, movieId, rating) =>
    ((userId, movieId),
      rating)
  }.join(predictions)
  val meanSquaredError = ratesAndPreds.map { case ((userId, movieId),
  (r1, r2)) =>
    val err = r1 - r2
    err * err
  }.mean
  println("Mean Squared Error= " + meanSquaredError)
  sqrt(meanSquaredError)
  val rmse = math.sqrt(meanSquaredError)
  println(s" RMSE = $rmse.")
}

1 - 您的数据集最好放入分布式文件系统 - Hadoop HDFS、S3 等

2 - 代码通过 spark-submit 脚本分发,如此处所述 https://spark.apache.org/docs/2.4.3/submitting-applications.html

How to distribute code

当您 spark-submit 一个 Spark 应用程序时,就会发生这种情况。分配可以是每个 CPU core/thread 或执行者。您不必对其进行编码。这就是人们使用 Spark 的原因,因为它应该(几乎)自动发生。

conf.setMaster("local[*]")

也就是说,您使用的线程数与 CPU 内核数一样多。那是本地发行版。

您最好从代码中删除该行并改用 spark-submit --master。阅读官方文档,尤其是。 Submitting Applications.

...and dataset into worker nodes? val rawData = sc.textFile("file:///usr/local/spark/dataset/rating.csv")

该行说明了 Movielens 数据集 (rating.csv) 的分布方式。它与 Spark 无关,因为 Spark 使用文件系统上的任何分布。

换句话说,在块大小为 256MB 的 Hadoop HDFS 上 (split),一个两倍于块大小的文件可分为两部分。这是 HDFS 使文件分发和 fault-tolerant.

当 Spark 读取 2-split 文件时,分布式计算(使用 RDD 描述)将使用 2 个分区和 2 个任务。

HDFS 是一个文件系统/存储,因此选择任何位置和 hdfs -put 数据集。将 HDFS 视为您可以远程访问的任何文件系统。使用位置作为 sc.textFile 的输入参数,你就完成了。