运行 作业通过 Spark SQL 通过 Spark Jobserver 查询 Cassandra 时出错

Error when running job that queries against Cassandra via Spark SQL through Spark Jobserver

所以我正在尝试 运行 作业,它只是 运行 使用 spark-sql 对 cassandra 进行查询,作业提交正常,作业开始正常。此代码在未通过 spark jobserver 运行 时有效(仅使用 spark submit 时)。有人可以告诉我导致以下错误的工作代码或配置文件有什么问题吗?

{
  "status": "ERROR",
  "ERROR": {
    "errorClass": "java.util.concurrent.ExecutionException",
    "cause": "Failed to open native connection to Cassandra at {127.0.1.1}:9042",
    "stack": ["com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSes
sion(CassandraConnector.scala:155)", "com.datastax.spark.connector.cql.CassandraConnector$$anonfun.apply(CassandraConnector.scal
a:141)", "com.datastax.spark.connector.cql.CassandraConnector$$anonfun.apply(CassandraConnector.scala:141)", "com.datastax.spark
.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)", "com.datastax.spark.connector.cql.RefCountedCache
.acquire(RefCountedCache.scala:56)", "com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
", "com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101)", "com.datastax.spark.connecto
r.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:112)", "com.datastax.spark.connector.cql.Schema$.fromCassandra(Sch
ema.scala:243)", "org.apache.spark.sql.cassandra.CassandraCatalog$$anon.load(CassandraCatalog.scala:22)", "org.apache.spark.sql.
cassandra.CassandraCatalog$$anon.load(CassandraCatalog.scala:19)", "com.google.common.cache.LocalCache$LoadingValueReference.loa
dFuture(LocalCache.java:3599)", "com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)", "com.google.common.ca
che.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)", "com.google.common.cache.LocalCache$Segment.get(LocalCache.java:225
7)", "com.google.common.cache.LocalCache.get(LocalCache.java:4000)", "com.google.common.cache.LocalCache.getOrLoad(LocalCache.java
:4004)", "com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)", "org.apache.spark.sql.cassandra.Cassand
raCatalog.lookupRelation(CassandraCatalog.scala:28)", "org.apache.spark.sql.cassandra.CassandraSQLContext$$anon.org$apache$spark
$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:218)", "org.apache.spark.sql.catalyst.analy
sis.OverrideCatalog$$anonfun$lookupRelation.apply(Catalog.scala:161)", "org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$
anonfun$lookupRelation.apply(Catalog.scala:161)", "scala.Option.getOrElse(Option.scala:120)", "org.apache.spark.sql.catalyst.ana
lysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)", "org.apache.spark.sql.cassandra.CassandraSQLContext$$anon.lookup
Relation(CassandraSQLContext.scala:218)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.sca
la:174)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply.applyOrElse(Analyzer.scala:186)", "or
g.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply.applyOrElse(Analyzer.scala:181)", "org.apache.spar
k.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNode.scala:188)", "org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.appl
y(TreeNode.scala:188)", "org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)", "org.apache.spark.sql.
catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)", "org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun.apply(TreeNod
e.scala:208)", "scala.collection.Iterator$$anon.next(Iterator.scala:328)", "scala.collection.Iterator$class.foreach(Iterator.sc
ala:727)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1157)", "scala.collection.generic.Growable$class.$plus$plus$e
q(Growable.scala:48)", "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)", "scala.collection.mutable.Arra
yBuffer.$plus$plus$eq(ArrayBuffer.scala:47)", "scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)", "scala.colle
ction.AbstractIterator.to(Iterator.scala:1157)", "scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)", "sc
ala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)", "scala.collection.TraversableOnce$class.toArray(TraversableOnce.sc
ala:252)", "scala.collection.AbstractIterator.toArray(Iterator.scala:1157)", "org.apache.spark.sql.catalyst.trees.TreeNode.transfo
rmChildrenDown(TreeNode.scala:238)", "org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)", "org.apache
.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelatio
ns$.apply(Analyzer.scala:181)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)", "or
g.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:61)", "org.apache.spark.
sql.catalyst.rules.RuleExecutor$$anonfun$apply$$anonfun$apply.apply(RuleExecutor.scala:59)", "scala.collection.LinearSeqOptimi
zed$class.foldLeft(LinearSeqOptimized.scala:111)", "scala.collection.immutable.List.foldLeft(List.scala:84)", "org.apache.spark.sq
l.catalyst.rules.RuleExecutor$$anonfun$apply.apply(RuleExecutor.scala:59)", "org.apache.spark.sql.catalyst.rules.RuleExecutor$$a
nonfun$apply.apply(RuleExecutor.scala:51)", "scala.collection.immutable.List.foreach(List.scala:318)", "org.apache.spark.sql.cat
alyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)", "org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLCon
text.scala:1082)", "org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)", "org.apache.spark.sql.SQLCont
ext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)", "org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)", "org.apac
he.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:211)", "org.apache.spark.sql.cassandra.Cassandra
SQLContext.sql(CassandraSQLContext.scala:214)", "CassSparkTest$.runJob(CassSparkTest.scala:23)", "CassSparkTest$.runJob(CassSparkT
est.scala:9)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture.apply(JobManagerActor.sca
la:235)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)", "scala.concurrent.impl.Future$P
romiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)",
 "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)", "java.lang.Thread.run(Thread.java:745)"],
    "causingClass": "java.io.IOException",
    "message": "java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042"
  }
}

这是我的工作 运行ning:

import org.apache.spark.{SparkContext, SparkConf}
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra.CassandraSQLContext
import org.apache.spark.sql._
import spark.jobserver._
import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory

object CassSparkTest extends SparkJob {
        def main(args: Array[String]) {

                val sc = new SparkContext("spark://192.168.10.11:7077", "test")
                val config = ConfigFactory.parseString("")
                val results = runJob(sc, config)
                println("Results:" + results)
        }
        override def validate(sc:SparkContext, config: Config): SparkJobValidation = {
                SparkJobValid
        }

        override def runJob(sc:SparkContext, config: Config): Any = {
                val sqlC = new CassandraSQLContext(sc)
                val df = sqlC.sql(config.getString("input.sql"))
                df.collect()
        }

}

这是我的 spark-jobserver 配置文件

# Template for a Spark Job Server configuration file
# When deployed these settings are loaded when job server starts
#
# Spark Cluster / Job Server configuration
spark {
  # spark.master will be passed to each job's JobContext
  master = "spark://192.168.10.11:7077"
  # master = "mesos://vm28-hulk-pub:5050"
  # master = "yarn-client"

  # Default # of CPUs for jobs to use for Spark standalone cluster
  job-number-cpus = 1

  jobserver {
    port = 2020
    jar-store-rootdir = /tmp/jobserver/jars

    jobdao = spark.jobserver.io.JobFileDAO

    filedao {
      rootdir = /tmp/spark-job-server/filedao/data
    }
  }

  # predefined Spark contexts
  # contexts {
  #   my-low-latency-context {
  #     num-cpu-cores = 1           # Number of cores to allocate.  Required.
  #     memory-per-node = 512m         # Executor memory per node, -Xmx style eg 512m, 1G, etc.
  #   }
  #   # define additional contexts here
  # }

  # universal context configuration.  These settings can be overridden, see README.md
  context-settings {
    num-cpu-cores = 1           # Number of cores to allocate.  Required.
    memory-per-node = 512m         # Executor memory per node, -Xmx style eg 512m, #1G, etc.

    # in case spark distribution should be accessed from HDFS (as opposed to being installed on every mesos slave)
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz"
    spark-cassandra-connection-host="127.0.0.1"
    # uris of jars to be loaded into the classpath for this context. Uris is a string list, or a string separated by commas ','
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"]

    dependent-jar-uris = ["file:///home/vagrant/lib/spark-cassandra-connector-assembly-1.3.0-M2-SNAPSHOT.jar"]

    # If you wish to pass any settings directly to the sparkConf as-is, add them here in passthrough,
    # such as hadoop connection settings that don't use the "spark." prefix
    passthrough {
      #es.nodes = "192.1.1.1"

    }
  }

  # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully
  # home = "/home/spark/spark"
}

# Note that you can use this file to define settings not only for job server,
# but for your Spark jobs as well.  Spark job configuration merges with this configuration file as defaults.

@vicg,首先你需要 spark.cassandra.connection.host——句号不是破折号。还要注意错误中的 IP 是“127.0.1.1”,而不是配置中的 IP。您还可以在创建上下文时传递 IP,例如:

curl -X POST 'localhost:8090/contexts/my-context?spark.cassandra.connection.host=127.0.0.1'

如果上述方法不起作用,请尝试以下 PR: https://github.com/spark-jobserver/spark-jobserver/pull/164