Spark Streaming - KafkaWordCount 不能在 Spark 独立集群上 运行
Spark Streaming - KafkaWordCount Cannot Run on a Spark Standalone Cluster
我需要您的帮助和建议,以便在独立的 Spark 集群上 运行ning Apache Spark KafkaWordCount 示例:
我可以 运行 Spark 示例,KafkaWordCount,在本地模式下通过
spark-submit .... --master local[4] ....
我可以从另一个节点(虚拟机)中的 Kafka 服务器获取消息,并在终端控制台上打印结果。
但是,当提交应用到spark独立集群时(通过
spark-submit .... --master spark://master:7077 ....
),我在 $SPARK_HOME/work/../../stderr 目录的每个工作节点目录中发现了异常。
每个单词计数批处理的结果是 NOT 打印到每个工作节点中的 $SPARK_HOME/work/../..stdout。
这是我每个 spark worker 节点在 $SPARK_HOME/conf/spark-env.sh:
中的资源设置
export SPARK_MASTER_IP=master
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=3g
export SPARK_WORKER_INSTANCES=2
我有 5 个虚拟机节点(在此处的主机名中):mykafka、master、data1、data2 和 data3。
感谢您提前提供的任何帮助和建议。
以下是在每个worker中发现的RpcTimeoutException异常:
16/04/11 23:07:30 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at scala.util.Failure$$anonfun$recover.apply(Try.scala:185)
at scala.util.Try$.apply(Try.scala:161)
at scala.util.Failure.recover(Try.scala:185)
at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324)
....
....
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds
at org.apache.spark.rpc.netty.NettyRpcEnv$$anon.run(NettyRpcEnv.scala:242)
... 7 more
16/04/11 23:07:31 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
beat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
....
....
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds
at org.apache.spark.rpc.netty.NettyRpcEnv$$anon.run(NettyRpcEnv.scala:242)
... 7 more
所以我在这个例子中遇到了完全相同的问题,它似乎与这个错误有关 https://issues.apache.org/jira/browse/SPARK-13906
不确定如何为这个例子设置这个,但我一直在试验代码,构建一个小的 Scala 应用程序,并向 SparkConf()
添加了一个额外的配置参数
val conf = new SparkConf()
.setAppName('name')
.set("spark.rpc.netty.dispatcher.numThreads","2")
感谢 David Gomez 和 spark mailer,经过大量谷歌搜索我找到了解决方案
我需要您的帮助和建议,以便在独立的 Spark 集群上 运行ning Apache Spark KafkaWordCount 示例:
我可以 运行 Spark 示例,KafkaWordCount,在本地模式下通过
spark-submit .... --master local[4] ....
我可以从另一个节点(虚拟机)中的 Kafka 服务器获取消息,并在终端控制台上打印结果。
但是,当提交应用到spark独立集群时(通过
spark-submit .... --master spark://master:7077 ....
),我在 $SPARK_HOME/work/../../stderr 目录的每个工作节点目录中发现了异常。 每个单词计数批处理的结果是 NOT 打印到每个工作节点中的 $SPARK_HOME/work/../..stdout。
这是我每个 spark worker 节点在 $SPARK_HOME/conf/spark-env.sh:
中的资源设置export SPARK_MASTER_IP=master
export SPARK_WORKER_CORES=4
export SPARK_WORKER_MEMORY=3g
export SPARK_WORKER_INSTANCES=2
我有 5 个虚拟机节点(在此处的主机名中):mykafka、master、data1、data2 和 data3。
感谢您提前提供的任何帮助和建议。
以下是在每个worker中发现的RpcTimeoutException异常:
16/04/11 23:07:30 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) at scala.util.Failure$$anonfun$recover.apply(Try.scala:185) at scala.util.Try$.apply(Try.scala:161) at scala.util.Failure.recover(Try.scala:185) at scala.concurrent.Future$$anonfun$recover.apply(Future.scala:324) .... .... at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds at org.apache.spark.rpc.netty.NettyRpcEnv$$anon.run(NettyRpcEnv.scala:242) ... 7 more 16/04/11 23:07:31 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM beat(5,[Lscala.Tuple2;@2628a359,BlockManagerId(5, data3, 34838))] in 1 attempts org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 seconds. This timeout is controlled by spark.executor.heartbeatInterval at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63) at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59) .... .... at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 10 seconds at org.apache.spark.rpc.netty.NettyRpcEnv$$anon.run(NettyRpcEnv.scala:242) ... 7 more
所以我在这个例子中遇到了完全相同的问题,它似乎与这个错误有关 https://issues.apache.org/jira/browse/SPARK-13906
不确定如何为这个例子设置这个,但我一直在试验代码,构建一个小的 Scala 应用程序,并向 SparkConf()
添加了一个额外的配置参数val conf = new SparkConf()
.setAppName('name')
.set("spark.rpc.netty.dispatcher.numThreads","2")
感谢 David Gomez 和 spark mailer,经过大量谷歌搜索我找到了解决方案