Mesos 上 Apache Spark 的自定义状态存储提供程序

Custom state store provider for Apache Spark on Mesos

我已经为 Apache Spark 2.3.0 编写了自定义状态存储和状态存储提供程序,并尝试使用附加参数部署作业:

--conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider

对于 运行ning Spark 作业,我使用 Marathon 和 Mesos,作业刚开始就失败了,出现异常:

java.lang.ClassNotFoundException: com.sample.state.CustomStateStoreProvider 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:348)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:235)
    at org.apache.spark.sql.execution.streaming.state.StateStoreProvider$.create(StateStore.scala:213)
    at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.stateStoreCustomMetrics(statefulOperators.scala:121)
    at org.apache.spark.sql.execution.streaming.StateStoreWriter$class.metrics(statefulOperators.scala:86)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics$lzycompute(statefulOperators.scala:251)
    at org.apache.spark.sql.execution.streaming.StateStoreSaveExec.metrics(statefulOperators.scala:251)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:58)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SparkPlanInfo$$anonfun$fromSparkPlan.apply(SparkPlanInfo.scala:62)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.SparkPlanInfo$.fromSparkPlan(SparkPlanInfo.scala:62)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch.apply(MicroBatchExecution.scala:475)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$$anonfun$apply$mcZ$sp.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:189)

这是 运行 作业的命令:

/spark/bin/spark-submit \
    --repositories "http://127.0.0.1:80/sbt-all" \
    --packages com.sample:pipelines:0.1.0 \
    --class com.sample.TestApplication \
    --conf spark.sql.streaming.stateStore.providerClass=com.sample.state.CustomStateStoreProvider \
    /spark/examples/jars/spark-examples_2.11-2.3.0.jar

classes com.sample.TestApplicationcom.sample.state.CustomStateStoreProvider 都位于 com.sample:pipelines:0.1.0 包中,我已经检查过几次了。如果没有 spark.sql.streaming.stateStore.providerClass 参数,应用程序将启动并且 运行 正常。

我已经尝试使用驱动程序和执行程序的额外 class 路径并使用 --jars 参数和位于 HDFS 中的 JAR 或通过 HTTP 来提交作业。

P.S.: 当我尝试在本地开始工作时没有任何问题,在这种情况下一切正常。

嗯,一般来说,spark.sql.streaming.stateStore.providerClass参数的值需要用引号括起来:--conf spark.sql.streaming.stateStore.providerClass="com.sample.state.CustomStateStoreProvider"。没有它,值后的 space 将被包含到值中,Spark 将查找 com.sample.state.CustomStateStoreProvider class (在行尾带有 space 符号)和将无法找到它。其他一切都很好:)