Spark SQL 在 Scala 中查询 DSE Cassandra

Spark SQL Query on DSE Cassandra in Scala

我想在 Scala IDE 的 DSE Cassandra table 上测试 Spark-SQL 查询。当在 dse spark-submit 中执行 jar 文件时,查询运行完美。 但是在 Scala IDE 中运行时会报错。 错误是,

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: killr_video.videos; line 1 pos 14;

我认为是 spark master 配置错误,因为我是 运行 本地模式的 master。

这是我发起的 spark 会话。

val spark = SparkSession
          .builder()
          .appName("CassandraSpark")
          .config("spark.cassandra.connection.host", "127.0.0.1")
          .config("spark.cassandra.connection.port", "9042")
          .enableHiveSupport()
          .master("local")
          .getOrCreate();

但是我不知道设置什么地址为主。我尝试将主地址设置为 "spark://127.0.0.1:7077",这是我在启动 Cassandra 时从 Web UI (localhost:7080) 找到的。但是还是报错如下

ERROR MapOutputTrackerMaster: Error communicating with MapOutputTracker java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100) at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110) at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84) at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1797) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290) at org.apache.spark.SparkContext.stop(SparkContext.scala:1796) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon.run(StandaloneAppClient.scala:131) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/05/22 11:46:44 ERROR Utils: Uncaught exception in thread appclient-registration-retry-thread org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:104) at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110) at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84) at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1797) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290) at org.apache.spark.SparkContext.stop(SparkContext.scala:1796) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon.run(StandaloneAppClient.scala:131) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100) ... 16 more 18/05/22 11:46:44 ERROR SparkContext: Error initializing SparkContext. java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:546) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:831) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:823) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823) 18/05/22 11:46:44 INFO SparkContext: SparkContext already stopped. Exception in thread "main" java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:546) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:831) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:823) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)

我该怎么做才能使此代码正常工作?

您不需要对 Cassandra IP 或大师进行硬编码 - 只需创建 SparkSession 对象,它就会起作用。这是工作代码(在 Java 中):

SparkSession spark = SparkSession
  .builder()
  .appName("CassandraSpark")
  .getOrCreate();

Dataset<Row> sqlDF = spark.sql("select * from test.t1 limit 1000");
sqlDF.printSchema();
sqlDF.show();

在DSE中,如果你提交到分布式集群,你可以指定master为dse://?,DSE会自动找到当前的master。全部possible options are described in documentation.