Spark Streaming - KafkaWordCount 不能在 Spark 独立集群上 运行

Spark Streaming - KafkaWordCount Cannot Run on a Spark Standalone Cluster

我需要您的帮助和建议,以便在独立的 Spark 集群上 运行ning Apache Spark KafkaWordCount 示例:

我可以 运行 Spark 示例,KafkaWordCount,在本地模式下通过

spark-submit .... --master local[4] ....

我可以从另一个节点(虚拟机)中的 Kafka 服务器获取消息,并在终端控制台上打印结果。

但是,当提交应用到spark独立集群时(通过

spark-submit .... --master spark://master:7077 ....

),我在 $SPARK_HOME/work/../../stderr 目录的每个工作节点目录中发现了异常。 每个单词计数批处理的结果是 NOT 打印到每个工作节点中的 $SPARK_HOME/work/../..stdout。

这是我每个 spark worker 节点在 $SPARK_HOME/conf/spark-env.sh:

中的资源设置
export SPARK_MASTER_IP=master
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=3g
export SPARK_WORKER_INSTANCES=2

我有 5 个虚拟机节点(在此处的主机名中):mykafka、master、data1、data2 和 data3。

感谢您提前提供的任何帮助和建议。

以下是在每个worker中发现的RpcTimeoutException异常:

16/04/11 23:07:30 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval
  at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
  at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
  at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
  at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
  at scala.util.Failure$$anonfun$recover.apply(Try.scala:185)
  at scala.util.Try$.apply(Try.scala:161)
  at scala.util.Failure.recover(Try.scala:185)
  at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324)
  ....
  ....
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds
  at org.apache.spark.rpc.netty.NettyRpcEnv$$anon.run(NettyRpcEnv.scala:242)
  ... 7 more
16/04/11 23:07:31 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
beat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval
  at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
  at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
  at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
  ....
  ....
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds
  at org.apache.spark.rpc.netty.NettyRpcEnv$$anon.run(NettyRpcEnv.scala:242)
  ... 7 more

所以我在这个例子中遇到了完全相同的问题,它似乎与这个错误有关 https://issues.apache.org/jira/browse/SPARK-13906

不确定如何为这个例子设置这个,但我一直在试验代码,构建一个小的 Scala 应用程序,并向 SparkConf()

添加了一个额外的配置参数
val conf = new SparkConf()
.setAppName('name')
.set("spark.rpc.netty.dispatcher.numThreads","2")

感谢 David Gomez 和 spark mailer,经过大量谷歌搜索我找到了解决方案

https://mail-archives.apache.org/mod_mbox/spark-user/201603.mbox/%3CCAAn_Wz1ik5YOYych92C85UNjKU28G+20s5y2AWgGrOBu-Uprdw@mail.gmail.com%3E