在 Spark 作业服务器中持久化/共享 RDD
Persisting / Sharing a RDD in Spark Job Server
我希望持久化来自 spark 作业的 RDD,以便所有使用 Spark 作业服务器的后续作业都可以使用它。这是我尝试过的:
作业 1:
package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
object FirstJob extends SparkJob with NamedRddSupport {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid
override def runJob(sc: SparkContext, config: Config): Any = {
// the below variable is to be accessed by other jobs:
val to_be_persisted : org.apache.spark.rdd.RDD[String] = sc.parallelize(Seq("some text"))
this.namedRdds.update("resultsRDD", to_be_persisted)
return to_be_persisted
}
}
作业 2:
package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
object NextJob extends SparkJob with NamedRddSupport {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("NextJob")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid
override def runJob(sc: SparkContext, config: Config): Any = {
val rdd = this.namedRdds.get[(String, String)]("resultsRDD").get
rdd
}
}
我得到的错误是:
{
"status": "ERROR",
"result": {
"message": "None.get",
"errorClass": "java.util.NoSuchElementException",
"stack": ["scala.None$.get(Option.scala:313)", "scala.None$.get(Option.scala:311)", "spark.jobserver.NextJob$.runJob(NextJob.scala:30)", "spark.jobserver.NextJob$.runJob(NextJob.scala:16)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture.apply(JobManagerActor.scala:278)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)", "java.lang.Thread.run(Thread.java:745)"]
}
请修改上面的代码,以便 to_be_persisted
可以访问。
谢谢
编辑:
创建 spark 上下文,在编译和打包 scala 源后使用:
curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m'
调用 FirstJob 和 NextJob 使用:
curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.FirstJob&context=test-context&sync=true'
curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.NextJob&context=test-context&sync=true'
这里似乎有两个问题:
如果您使用的是最新的 spark-jobserver 版本 (0.6.2-SNAPSHOT),则存在一个关于 NamedObjects 无法正常工作的未解决的错误 - 似乎符合您的描述:https://github.com/spark-jobserver/spark-jobserver/issues/386.
您还有一个小的类型不匹配 - 在 FirstJob 中您坚持 RDD[String]
,而在 NextJob 中您试图获取 RDD[(String, String)]
- 在 NextJob 中,应该读作 val rdd = this.namedRdds.get[String]("resultsRDD").get
)。
我已经使用 spark-jobserver 版本 0.6.0 和上述小型修复程序尝试了您的代码,它有效。
我希望持久化来自 spark 作业的 RDD,以便所有使用 Spark 作业服务器的后续作业都可以使用它。这是我尝试过的:
作业 1:
package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
object FirstJob extends SparkJob with NamedRddSupport {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("FirstJob")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid
override def runJob(sc: SparkContext, config: Config): Any = {
// the below variable is to be accessed by other jobs:
val to_be_persisted : org.apache.spark.rdd.RDD[String] = sc.parallelize(Seq("some text"))
this.namedRdds.update("resultsRDD", to_be_persisted)
return to_be_persisted
}
}
作业 2:
package spark.jobserver
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark._
import org.apache.spark.SparkContext._
import scala.util.Try
object NextJob extends SparkJob with NamedRddSupport {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("NextJob")
val sc = new SparkContext(conf)
val config = ConfigFactory.parseString("")
val results = runJob(sc, config)
println("Result is " + results)
}
override def validate(sc: SparkContext, config: Config): SparkJobValidation = SparkJobValid
override def runJob(sc: SparkContext, config: Config): Any = {
val rdd = this.namedRdds.get[(String, String)]("resultsRDD").get
rdd
}
}
我得到的错误是:
{
"status": "ERROR",
"result": {
"message": "None.get",
"errorClass": "java.util.NoSuchElementException",
"stack": ["scala.None$.get(Option.scala:313)", "scala.None$.get(Option.scala:311)", "spark.jobserver.NextJob$.runJob(NextJob.scala:30)", "spark.jobserver.NextJob$.runJob(NextJob.scala:16)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture.apply(JobManagerActor.scala:278)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)", "java.lang.Thread.run(Thread.java:745)"]
}
请修改上面的代码,以便 to_be_persisted
可以访问。
谢谢
编辑:
创建 spark 上下文,在编译和打包 scala 源后使用:
curl -d "" 'localhost:8090/contexts/test-context?num-cpu-cores=4&mem-per-node=512m'
调用 FirstJob 和 NextJob 使用:
curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.FirstJob&context=test-context&sync=true'
curl -d "" 'localhost:8090/jobs?appName=test&classPath=spark.jobserver.NextJob&context=test-context&sync=true'
这里似乎有两个问题:
如果您使用的是最新的 spark-jobserver 版本 (0.6.2-SNAPSHOT),则存在一个关于 NamedObjects 无法正常工作的未解决的错误 - 似乎符合您的描述:https://github.com/spark-jobserver/spark-jobserver/issues/386.
您还有一个小的类型不匹配 - 在 FirstJob 中您坚持
RDD[String]
,而在 NextJob 中您试图获取RDD[(String, String)]
- 在 NextJob 中,应该读作val rdd = this.namedRdds.get[String]("resultsRDD").get
)。
我已经使用 spark-jobserver 版本 0.6.0 和上述小型修复程序尝试了您的代码,它有效。