spark应用jar是否需要提交?

Is it necessary to submit spark application jar?

如标题所述,我想知道是否有必要 spark-submit *.jar?

我使用 Datastax Enterprise Cassandra 有一段时间了,但现在我也需要使用 Spark。我几乎观看了 DS320: DataStax Enterprise Analytics with Apache Spark 中的所有视频,没有关于从 java 应用程序远程连接到 spark 的内容。

现在我有 3 个 运行 个 DSE 节点。我可以从 spark shell 连接到 Spark。但是在尝试从 java 代码连接到 Spark 2 天后,我放弃了。

这是我的 Java 代码

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("AppName");
//sparkConf.set("spark.shuffle.blockTransferService", "nio");
//sparkConf.set("spark.driver.host", "*.*.*.*");
//sparkConf.set("spark.driver.port", "7007");
sparkConf.setMaster("spark://*.*.*.*:7077");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

连接结果

16/01/18 14:32:43 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:32:43 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
    at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:659)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:03 ERROR TransportResponseHandler: Still have 2 requests outstanding when connection from *.*.*.*/*.*.*.*:7077 is closed
16/01/18 14:33:03 WARN AppClient$ClientEndpoint: Failed to connect to master *.*.*.*:7077
java.io.IOException: Connection from *.*.*.*/*.*.*.*:7077 closed
    at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124)
    at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144)
    at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739)
    at io.netty.channel.AbstractChannel$AbstractUnsafe.run(AbstractChannel.java:659)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:23 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up.
16/01/18 14:33:23 WARN SparkDeploySchedulerBackend: Application ID is not initialized yet.
16/01/18 14:33:23 WARN AppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master
16/01/18 14:33:23 ERROR MapOutputTrackerMaster: Error communicating with MapOutputTracker
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:110)
    at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:120)
    at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:462)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
    at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1756)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1755)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:127)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$$anonfun$run.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
16/01/18 14:33:23 ERROR Utils: Uncaught exception in thread appclient-registration-retry-thread
org.apache.spark.SparkException: Error communicating with MapOutputTracker
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:114)
    at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:120)
    at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:462)
    at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93)
    at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1756)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1755)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:127)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$$anonfun$run.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:77)
    at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:110)
    ... 18 more
16/01/18 14:33:23 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up.
    at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:438)
    at org.apache.spark.scheduler.cluster.SparkDeploySchedulerBackend.dead(SparkDeploySchedulerBackend.scala:124)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint.markDead(AppClient.scala:264)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$$anonfun$run.apply$mcV$sp(AppClient.scala:134)
    at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1163)
    at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon.run(AppClient.scala:129)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我尝试更改 SPARK_MASTER_IP、SPARK_LOCAL_IP 和许多其他配置变量,但没有成功。现在我发现了一些关于将 jars 提交给 Spark 的文章,我不确定(找不到任何证据)是否是原因? spark-submit 和交互式 shell 是使用 spark 的唯一方法吗?

有关于它的文章吗?如果您能给我小费,我将不胜感激。

您将需要一个 jar,以便执行程序可以 运行 您的自定义代码。您可以使用 SparkConf.setJars 设置此 jar。但这不是连接到 Spark master 和设置 Spark 应用程序的要求。谁知道呢,也许您不想 运行 任何自定义代码。 (这可能是 Spark SQL 的合理情况。)

你也不需要使用spark-submit


我对 DataStax 一无所知,所以它可能是任何东西。但是从错误消息来看,您的应用程序可能正在尝试连接到错误的主机,或者存在网络问题。如果你可以用 spark-shell 从同一台机器到达同一个 Spark master,那当然不是这样了。查看 master 日志,也许它可以告诉您为什么要关闭与您的应用程序的连接。

我强烈建议将 dse spark-submit 与 dse 一起使用。虽然这不是必需的,但这绝对比确保为 DSE 设置的安全性和 class 路径选项适用于您的集群要容易得多。它还提供了一种更简单的方法(在我看来)来配置你的 SparkConf 并将 jars 放在执行器上 class-paths.

在 DSE 中,它还会自动将您的应用程序路由到正确的 Spark master url,进一步简化设置。

如果你真的想手动构建你的 SparkConf,请务必将你的 spark master 映射到 dsetool spark-master 的输出或它在你的 DSE 版本中的等价物。