运行 Spark 作业服务器中的 Mllib
Running Mlib via Spark Job Server
我正在练习使用 spark 网站提供的在线资源开发示例模型。我设法使用 Spark-Shell 创建模型并 运行 它用于示例数据,但是如何在生产环境中实际 运行 模型?是通过 Spark 作业服务器吗?
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
val data = sc.textFile("hdfs://mycluster/user/Cancer.csv")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts.last.toDouble, Vectors.dense(parts.take(9).map(_.toDouble)))
}
var svm = new SVMWithSGD().setIntercept(true)
val model = svm.run(parsedData)
var predictedValue = model.predict(Vectors.dense(5,1,1,1,2,1,3,1,1))
println(predictedValue)
当我在 spark-shell 中 运行 上面的代码工作完美,但我不知道我们如何在生产环境中实际 运行 建模。我尝试通过 spark jobserver 运行 它,但出现错误,
curl -d "input.string = 1, 2, 3, 4, 5, 6, 7, 8, 9" 'ptfhadoop01v:8090/jobs?appName=SQL&classPath=spark.jobserver.SparkPredict'
我确定是因为我传递的是一个字符串值,而程序期望它是矢量元素,有人可以指导我如何实现这一点。这也是在生产环境中将数据传递给模型的方式吗?或者是其他方式。
Spark Job-server 用于生产用例,您希望在其中设计 Spark 作业的管道,并且(可选)在 REST API 上跨作业使用 SparkContext。 Sparkplug 是 Spark Job-server 的替代品,提供类似的构造。
但是,要回答关于如何在生产环境中 运行 一个(单个)Spark 作业的问题,答案是您不需要第三方库来执行此操作。您只需要构造一个 SparkContext 对象,并使用它来触发 Spark 作业。例如,对于您的代码片段,所需要的只是;
package runner
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import com.typesafe.config.{ConfigFactory, Config}
import org.apache.spark.{SparkConf, SparkContext}
/**
*
*/
object SparkRunner {
def main (args: Array[String]){
val config: Config = ConfigFactory.load("app-default-config") /*Use a library to read a config file*/
val sc: SparkContext = constructSparkContext(config)
val data = sc.textFile("hdfs://mycluster/user/Cancer.csv")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts.last.toDouble, Vectors.dense(parts.take(9).map(_.toDouble)))
}
var svm = new SVMWithSGD().setIntercept(true)
val model = svm.run(parsedData)
var predictedValue = model.predict(Vectors.dense(5,1,1,1,2,1,3,1,1))
println(predictedValue)
}
def constructSparkContext(config: Config): SparkContext = {
val conf = new SparkConf()
conf
.setMaster(config.getString("spark.master"))
.setAppName(config.getString("app.name"))
/*Set more configuration values here*/
new SparkContext(conf)
}
}
您还可以选择使用 Spark 库本身提供的 spark-submit 脚本包装器SparkSubmit。
我正在练习使用 spark 网站提供的在线资源开发示例模型。我设法使用 Spark-Shell 创建模型并 运行 它用于示例数据,但是如何在生产环境中实际 运行 模型?是通过 Spark 作业服务器吗?
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
val data = sc.textFile("hdfs://mycluster/user/Cancer.csv")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts.last.toDouble, Vectors.dense(parts.take(9).map(_.toDouble)))
}
var svm = new SVMWithSGD().setIntercept(true)
val model = svm.run(parsedData)
var predictedValue = model.predict(Vectors.dense(5,1,1,1,2,1,3,1,1))
println(predictedValue)
当我在 spark-shell 中 运行 上面的代码工作完美,但我不知道我们如何在生产环境中实际 运行 建模。我尝试通过 spark jobserver 运行 它,但出现错误,
curl -d "input.string = 1, 2, 3, 4, 5, 6, 7, 8, 9" 'ptfhadoop01v:8090/jobs?appName=SQL&classPath=spark.jobserver.SparkPredict'
我确定是因为我传递的是一个字符串值,而程序期望它是矢量元素,有人可以指导我如何实现这一点。这也是在生产环境中将数据传递给模型的方式吗?或者是其他方式。
Spark Job-server 用于生产用例,您希望在其中设计 Spark 作业的管道,并且(可选)在 REST API 上跨作业使用 SparkContext。 Sparkplug 是 Spark Job-server 的替代品,提供类似的构造。
但是,要回答关于如何在生产环境中 运行 一个(单个)Spark 作业的问题,答案是您不需要第三方库来执行此操作。您只需要构造一个 SparkContext 对象,并使用它来触发 Spark 作业。例如,对于您的代码片段,所需要的只是;
package runner
import org.apache.spark.mllib.classification.SVMWithSGD
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import com.typesafe.config.{ConfigFactory, Config}
import org.apache.spark.{SparkConf, SparkContext}
/**
*
*/
object SparkRunner {
def main (args: Array[String]){
val config: Config = ConfigFactory.load("app-default-config") /*Use a library to read a config file*/
val sc: SparkContext = constructSparkContext(config)
val data = sc.textFile("hdfs://mycluster/user/Cancer.csv")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts.last.toDouble, Vectors.dense(parts.take(9).map(_.toDouble)))
}
var svm = new SVMWithSGD().setIntercept(true)
val model = svm.run(parsedData)
var predictedValue = model.predict(Vectors.dense(5,1,1,1,2,1,3,1,1))
println(predictedValue)
}
def constructSparkContext(config: Config): SparkContext = {
val conf = new SparkConf()
conf
.setMaster(config.getString("spark.master"))
.setAppName(config.getString("app.name"))
/*Set more configuration values here*/
new SparkContext(conf)
}
}
您还可以选择使用 Spark 库本身提供的 spark-submit 脚本包装器SparkSubmit。