Spark + Cassandra 连接器因 LocalNodeFirstLoadBalancingPolicy.close() 而失败

Spark + Cassandra connector fails with LocalNodeFirstLoadBalancingPolicy.close()

我一直在尝试将 cassandra 与 scala 中的 Spark 连接起来,但遇到了几个问题。 以下是使用的版本:

    Spark 1.5.0
    Cassandra 2.1.9
    Scala 2.11.1

以下是我遵循的步骤: - 使用默认配置下载 Cassandra,我通过 bin/cassandra -f 启动它。 Cassandra 启动良好并监听 127.0.0.1 - 我在 spark 键空间的 try table 中添加了一些模拟数据。 - 下载 Spark 并通过 sbin/start-master.sh 启动 master。我在 localhost:8888 上看到大师 运行 很好 - 我写了以下 build.sbt :

val sparkVersion = "1.5.0"

libraryDependencies ++= Seq(
  jdbc,
  anorm,
  cache,
  ws,
  "org.scalatest" % "scalatest_2.11" % "3.0.0-M8",
  "com.typesafe" % "scalalogging-slf4j_2.10" % "1.1.0",
  "com.typesafe.scala-logging" %% "scala-logging" % "3.1.0",
  "org.apache.spark" % "spark-core_2.11" % sparkVersion,
  "org.apache.spark" % "spark-streaming_2.11" % sparkVersion,
  "org.apache.spark" % "spark-streaming-twitter_2.11" % sparkVersion,
  "org.apache.spark" %% "spark-streaming-kafka" % sparkVersion,
  "org.apache.spark" % "spark-sql_2.11" % sparkVersion,
  "com.datastax.cassandra" % "cassandra-driver-core" % "3.0.0-alpha2",
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0-M1",
  "org.scalatest" %% "scalatest" % "2.2.1" % "test",
  "org.mockito" % "mockito-all" % "1.9.5" % "test"
)

这是我通过 "spark.cassandra.connector.host" = "spark://127.0.0.1:7077".

获得的堆栈跟踪
[error] o.a.s.u.SparkUncaughtExceptionHandler - Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@b6391a7 rejected from java.util.concurrent.ThreadPoolExecutor@7f56044a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 3]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) ~[na:1.8.0_60]
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) [na:1.8.0_60]
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) [na:1.8.0_60]
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112) ~[na:1.8.0_60]
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters.apply(AppClient.scala:96) ~[spark-core_2.11-1.5.0.jar:1.5.0]

如果我将此参数更改为本地[*],则会得到此堆栈跟踪:

play.api.Application$$anon: Execution exception[[RuntimeException: java.lang.AbstractMethodError: com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.close()V]]
    at play.api.Application$class.handleError(Application.scala:296) ~[play_2.11-2.3.8.jar:2.3.8]
    at play.api.DefaultApplication.handleError(Application.scala:402) [play_2.11-2.3.8.jar:2.3.8]
    at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$$anonfun$applyOrElse.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.11-2.3.8.jar:2.3.8]
    at play.core.server.netty.PlayDefaultUpstreamHandler$$anonfun$$anonfun$applyOrElse.apply(PlayDefaultUpstreamHandler.scala:320) [play_2.11-2.3.8.jar:2.3.8]
    at scala.Option.map(Option.scala:146) [scala-library-2.11.7.jar:na]
Caused by: java.lang.RuntimeException: java.lang.AbstractMethodError: com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.close()V
    at play.api.mvc.ActionBuilder$$anon.apply(Action.scala:523) ~[play_2.11-2.3.8.jar:2.3.8]
    at play.api.mvc.Action$$anonfun$apply$$anonfun$apply$$anonfun$apply.apply(Action.scala:130) ~[play_2.11-2.3.8.jar:2.3.8]
    at play.api.mvc.Action$$anonfun$apply$$anonfun$apply$$anonfun$apply.apply(Action.scala:130) ~[play_2.11-2.3.8.jar:2.3.8]
    at play.utils.Threads$.withContextClassLoader(Threads.scala:21) ~[play_2.11-2.3.8.jar:2.3.8]
    at play.api.mvc.Action$$anonfun$apply$$anonfun$apply.apply(Action.scala:129) ~[play_2.11-2.3.8.jar:2.3.8]
Caused by: java.lang.AbstractMethodError: com.datastax.spark.connector.cql.LocalNodeFirstLoadBalancingPolicy.close()V
    at com.datastax.driver.core.Cluster$Manager.close(Cluster.java:1423) ~[cassandra-driver-core-3.0.0-alpha2.jar:na]
    at com.datastax.driver.core.Cluster$Manager.access0(Cluster.java:1171) ~[cassandra-driver-core-3.0.0-alpha2.jar:na]
    at com.datastax.driver.core.Cluster.closeAsync(Cluster.java:462) ~[cassandra-driver-core-3.0.0-alpha2.jar:na]
    at com.datastax.driver.core.Cluster.close(Cluster.java:473) ~[cassandra-driver-core-3.0.0-alpha2.jar:na]
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:163) ~[spark-cassandra-connector_2.11-1.5.0-M1.jar:1.5.0-M1]

知道问题出在哪里吗?

export SPARK_MASTER_IP=master

所以我也遇到了同样的问题。我把"master"改成了我的ip。所以我解决了这个问题。

Spark Cassandra 连接器支持 java-驱动程序版本 2.1。 此处支持驱动程序 v.2.2:https://datastax-oss.atlassian.net/browse/SPARKC-229 它将包含在 spark-cassandra-connector 1.5.0-M2 中,或者您可以自己构建它。 我想,它也适用于 3.0 java 驱动程序。 另一方面,建议使用与 C* 相同的 java 驱动程序,因此对 Cassandra 2.1

使用 java 驱动程序 2.1