ERROR 清除广播异常时出错
ERROR Error cleaning broadcast Exception
我在 运行 我的 spark 流应用程序时收到以下错误,我们有一个大型应用程序 运行 多个有状态(使用 mapWithState)和无状态操作。由于 spark 本身挂起并且我们看到的唯一错误是在 spark 日志中而不是应用程序日志本身,因此隔离错误变得越来越困难。
错误仅在 4-5 分钟后发生,微批处理间隔为 10 秒。
我在具有基于 Kafka 的输入和输出流的 ubuntu 服务器上使用 Spark 1.6.1。
请注意,我不可能提供尽可能小的代码来重现此错误,因为它不会出现在单元测试用例中,而且应用程序本身非常大
您能提供的解决此问题的任何指导都会有所帮助。如果我能提供更多信息,请告诉我。
下面的内联错误:
[2017-07-11 16:15:15,338] ERROR Error cleaning broadcast 2211 (org.apache.spark.ContextCleaner)
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:77)
at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$$anonfun$apply$mcV$sp.apply(ContextCleaner.scala:189)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$$anonfun$apply$mcV$sp.apply(ContextCleaner.scala:180)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning.apply$mcV$sp(ContextCleaner.scala:180)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
at org.apache.spark.ContextCleaner$$anon.run(ContextCleaner.scala:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
您的异常消息清楚地表明其 RPCTimeout 由于默认配置为 120 秒,并根据您的工作负载调整为最佳值。
请参阅 1.6 configuration
你的错误信息org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds].
和
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
证实了这一点。
为了更好地理解,请参阅以下来自
的代码
/**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
* is still not ready
*/
def awaitResult[T](awaitable: Awaitable[T]): T = {
try {
Await.result(awaitable, duration)
} catch addMessageIfTimeout
}
}
- 另见
我在 运行 我的 spark 流应用程序时收到以下错误,我们有一个大型应用程序 运行 多个有状态(使用 mapWithState)和无状态操作。由于 spark 本身挂起并且我们看到的唯一错误是在 spark 日志中而不是应用程序日志本身,因此隔离错误变得越来越困难。
错误仅在 4-5 分钟后发生,微批处理间隔为 10 秒。 我在具有基于 Kafka 的输入和输出流的 ubuntu 服务器上使用 Spark 1.6.1。
请注意,我不可能提供尽可能小的代码来重现此错误,因为它不会出现在单元测试用例中,而且应用程序本身非常大
您能提供的解决此问题的任何指导都会有所帮助。如果我能提供更多信息,请告诉我。
下面的内联错误:
[2017-07-11 16:15:15,338] ERROR Error cleaning broadcast 2211 (org.apache.spark.ContextCleaner)
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:63)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout.applyOrElse(RpcTimeout.scala:59)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136)
at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228)
at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45)
at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:77)
at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$$anonfun$apply$mcV$sp.apply(ContextCleaner.scala:189)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$$anonfun$apply$mcV$sp.apply(ContextCleaner.scala:180)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning.apply$mcV$sp(ContextCleaner.scala:180)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180)
at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173)
at org.apache.spark.ContextCleaner$$anon.run(ContextCleaner.scala:68)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
您的异常消息清楚地表明其 RPCTimeout 由于默认配置为 120 秒,并根据您的工作负载调整为最佳值。 请参阅 1.6 configuration
你的错误信息org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds].
和
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
证实了这一点。
为了更好地理解,请参阅以下来自
的代码 /**
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
* is still not ready
*/
def awaitResult[T](awaitable: Awaitable[T]): T = {
try {
Await.result(awaitable, duration)
} catch addMessageIfTimeout
}
}
- 另见