Spark SQL 在 Scala 中查询 DSE Cassandra
Spark SQL Query on DSE Cassandra in Scala
我想在 Scala IDE 的 DSE Cassandra table 上测试 Spark-SQL 查询。当在 dse spark-submit 中执行 jar 文件时,查询运行完美。
但是在 Scala IDE 中运行时会报错。
错误是,
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: killr_video
.videos
; line 1 pos 14;
我认为是 spark master 配置错误,因为我是 运行 本地模式的 master。
这是我发起的 spark 会话。
val spark = SparkSession
.builder()
.appName("CassandraSpark")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.enableHiveSupport()
.master("local")
.getOrCreate();
但是我不知道设置什么地址为主。我尝试将主地址设置为 "spark://127.0.0.1:7077",这是我在启动 Cassandra 时从 Web UI (localhost:7080) 找到的。但是还是报错如下
ERROR MapOutputTrackerMaster: Error communicating with MapOutputTracker
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100)
at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110)
at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1797)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1796)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon.run(StandaloneAppClient.scala:131)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/05/22 11:46:44 ERROR Utils: Uncaught exception in thread appclient-registration-retry-thread
org.apache.spark.SparkException: Error communicating with MapOutputTracker
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:104)
at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110)
at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580)
at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84)
at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1797)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290)
at org.apache.spark.SparkContext.stop(SparkContext.scala:1796)
at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254)
at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon.run(StandaloneAppClient.scala:131)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212)
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:190)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100)
... 16 more
18/05/22 11:46:44 ERROR SparkContext: Error initializing SparkContext.
java.lang.NullPointerException
at org.apache.spark.SparkContext.(SparkContext.scala:546)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:831)
at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:823)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
18/05/22 11:46:44 INFO SparkContext: SparkContext already stopped.
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.SparkContext.(SparkContext.scala:546)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258)
at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:831)
at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:823)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
我该怎么做才能使此代码正常工作?
您不需要对 Cassandra IP 或大师进行硬编码 - 只需创建 SparkSession 对象,它就会起作用。这是工作代码(在 Java 中):
SparkSession spark = SparkSession
.builder()
.appName("CassandraSpark")
.getOrCreate();
Dataset<Row> sqlDF = spark.sql("select * from test.t1 limit 1000");
sqlDF.printSchema();
sqlDF.show();
在DSE中,如果你提交到分布式集群,你可以指定master为dse://?
,DSE会自动找到当前的master。全部possible options are described in documentation.
我想在 Scala IDE 的 DSE Cassandra table 上测试 Spark-SQL 查询。当在 dse spark-submit 中执行 jar 文件时,查询运行完美。 但是在 Scala IDE 中运行时会报错。 错误是,
Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found:
killr_video
.videos
; line 1 pos 14;
我认为是 spark master 配置错误,因为我是 运行 本地模式的 master。
这是我发起的 spark 会话。
val spark = SparkSession
.builder()
.appName("CassandraSpark")
.config("spark.cassandra.connection.host", "127.0.0.1")
.config("spark.cassandra.connection.port", "9042")
.enableHiveSupport()
.master("local")
.getOrCreate();
但是我不知道设置什么地址为主。我尝试将主地址设置为 "spark://127.0.0.1:7077",这是我在启动 Cassandra 时从 Web UI (localhost:7080) 找到的。但是还是报错如下
ERROR MapOutputTrackerMaster: Error communicating with MapOutputTracker java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100) at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110) at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84) at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1797) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290) at org.apache.spark.SparkContext.stop(SparkContext.scala:1796) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon.run(StandaloneAppClient.scala:131) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 18/05/22 11:46:44 ERROR Utils: Uncaught exception in thread appclient-registration-retry-thread org.apache.spark.SparkException: Error communicating with MapOutputTracker at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:104) at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:110) at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:580) at org.apache.spark.SparkEnv.stop(SparkEnv.scala:84) at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1797) at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1290) at org.apache.spark.SparkContext.stop(SparkContext.scala:1796) at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:142) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:254) at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anon.run(StandaloneAppClient.scala:131) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.InterruptedException at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:212) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:222) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:227) at scala.concurrent.Await$$anonfun$result.apply(package.scala:190) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:190) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:100) ... 16 more 18/05/22 11:46:44 ERROR SparkContext: Error initializing SparkContext. java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:546) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:831) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:823) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823) 18/05/22 11:46:44 INFO SparkContext: SparkContext already stopped. Exception in thread "main" java.lang.NullPointerException at org.apache.spark.SparkContext.(SparkContext.scala:546) at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2258) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:831) at org.apache.spark.sql.SparkSession$Builder$$anonfun.apply(SparkSession.scala:823) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:823)
我该怎么做才能使此代码正常工作?
您不需要对 Cassandra IP 或大师进行硬编码 - 只需创建 SparkSession 对象,它就会起作用。这是工作代码(在 Java 中):
SparkSession spark = SparkSession
.builder()
.appName("CassandraSpark")
.getOrCreate();
Dataset<Row> sqlDF = spark.sql("select * from test.t1 limit 1000");
sqlDF.printSchema();
sqlDF.show();
在DSE中,如果你提交到分布式集群,你可以指定master为dse://?
,DSE会自动找到当前的master。全部possible options are described in documentation.