AWS EMR Spark 作业重新启动 [AsyncEventQueue:从队列 appStatus 中删除事件。]

AWS EMR Spark job restarts [AsyncEventQueue: Dropping event from queue appStatus.]

我的 pyspark 作业(2 小时,处理 20 GB,写入 40MB)即使在成功 运行(日志)并将数据写入 s3 后重新启动作业。我尝试了 pyspark 2.3.0、2.3.1 和 emr-5.14.0、emr-5.16.0。

回溯:

18/08/22 17:45:13 ERROR AsyncEventQueue: Dropping event from queue appStatus. This likely means one of the listeners is too slow and cannot keep up with the rate at which tasks are being started by the scheduler.
18/08/22 17:45:13 WARN AsyncEventQueue: Dropped 1 events from appStatus since Thu Jan 01 00:00:00 UTC 1970.
18/08/22 17:46:28 WARN AsyncEventQueue: Dropped 25523 events from appStatus since Wed Aug 22 17:45:13 UTC 2018.
18/08/22 17:47:28 WARN AsyncEventQueue: Dropped 3417 events from appStatus since Wed Aug 22 17:46:28 UTC 2018.
18/08/22 17:48:28 WARN AsyncEventQueue: Dropped 3669 events from appStatus since Wed Aug 22 17:47:28 UTC 2018.
18/08/22 17:49:28 WARN AsyncEventQueue: Dropped 7725 events from appStatus since Wed Aug 22 17:48:28 UTC 2018.
18/08/22 17:50:28 WARN AsyncEventQueue: Dropped 6609 events from appStatus since Wed Aug 22 17:49:28 UTC 2018.
18/08/22 17:53:44 WARN AsyncEventQueue: Dropped 2272 events from appStatus since Wed Aug 22 17:50:28 UTC 2018.
18/08/22 17:54:39 WARN ShutdownHookManager: ShutdownHook '$anon' timeout, java.util.concurrent.TimeoutException
java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at org.apache.hadoop.util.ShutdownHookManager.run(ShutdownHookManager.java:67)
18/08/22 17:54:39 ERROR Utils: Uncaught exception in thread pool-4-thread-1
java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.Thread.join(Thread.java:1252)
    at java.lang.Thread.join(Thread.java:1326)
    at org.apache.spark.scheduler.AsyncEventQueue.stop(AsyncEventQueue.scala:135)
    at org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop.apply(LiveListenerBus.scala:219)
    at org.apache.spark.scheduler.LiveListenerBus$$anonfun$stop.apply(LiveListenerBus.scala:219)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at org.apache.spark.scheduler.LiveListenerBus.stop(LiveListenerBus.scala:219)
    at org.apache.spark.SparkContext$$anonfun$stop.apply$mcV$sp(SparkContext.scala:1922)
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1360)
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1921)
    at org.apache.spark.SparkContext$$anonfun.apply$mcV$sp(SparkContext.scala:573)
    at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$$anonfun$apply$mcV$sp.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply$mcV$sp(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll.apply(ShutdownHookManager.scala:188)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
    at org.apache.spark.util.SparkShutdownHookManager$$anon.run(ShutdownHookManager.scala:178)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

在这里找到答案 [1]。

tldr;

To resolve this issue, explicitly invoke sparkContext.stop() before exiting the application.

[1] https://community.hortonworks.com/content/supportkb/208452/warn-shutdownhookmanager-shutdownhook-anon2-timeou.html