将 spark 2.4.3 (emr 5.25.0) 与 delta lake io 0.6.0 一起使用时出现奇怪的非关键异常

Strange non-critical exception when using spark 2.4.3 (emr 5.25.0) with delta lake io 0.6.0

我已经成功地将 Spark 2.4.3 - Scala -(在 EMR 5.25.0 中)与 Delta Lake IO 0.6.0 一起使用。我的工作 运行 很好,但是我正在做一些优化和打扫房间,发现了这个奇怪的异常,虽然看起来不涉及我的代码,也不影响 Spark 应用程序的成功完成,让人大跌眼镜raise :) 我一直在搜索火花问题等等,但没有找到任何理由或进一步的提示。它发生在这份工作中:

20/05/13 23:34:28 INFO SparkContext: Starting job: apply at DatabricksLogging.scala:77
20/05/13 23:34:28 INFO DAGScheduler: Registering RDD 81 (apply at DatabricksLogging.scala:77)
20/05/13 23:34:28 INFO DAGScheduler: Registering RDD 96 (apply at DatabricksLogging.scala:77)
20/05/13 23:34:28 INFO DAGScheduler: Registering RDD 88 (apply at DatabricksLogging.scala:77)
20/05/13 23:34:28 INFO DAGScheduler: Registering RDD 101 (apply at DatabricksLogging.scala:77)
20/05/13 23:34:28 INFO DAGScheduler: Registering RDD 104 (apply at DatabricksLogging.scala:77)
20/05/13 23:34:28 INFO DAGScheduler: Got job 205 (apply at DatabricksLogging.scala:77) with 1 output partitions
20/05/13 23:34:28 INFO DAGScheduler: Final stage: ResultStage 1216 (apply at DatabricksLogging.scala:77)
20/05/13 23:34:28 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 1215)
20/05/13 23:34:28 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 1215)
20/05/13 23:34:28 INFO DAGScheduler: Submitting ShuffleMapStage 1212 (MapPartitionsRDD[96] at apply at DatabricksLogging.scala:77), which has no missing parents
20/05/13 23:34:29 INFO MemoryStore: Block broadcast_220 stored as values in memory (estimated size 55.2 KB, free 4.6 GB)
20/05/13 23:34:29 INFO MemoryStore: Block broadcast_220_piece0 stored as bytes in memory (estimated size 20.7 KB, free 4.6 GB)
20/05/13 23:34:29 INFO BlockManagerInfo: Added broadcast_220_piece0 in memory on ip-10-10-175-231.eu-west-1.compute.internal:43215 (size: 20.7 KB, free: 4.6 GB)
20/05/13 23:34:29 INFO SparkContext: Created broadcast 220 from broadcast at DAGScheduler.scala:1201
20/05/13 23:34:29 INFO DAGScheduler: Submitting 521 missing tasks from ShuffleMapStage 1212 (MapPartitionsRDD[96] at apply at DatabricksLogging.scala:77) (first 15 tasks are for partitions Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14))
20/05/13 23:34:29 INFO YarnClusterScheduler: Adding task set 1212.0 with 521 tasks

异常:

20/05/13 23:36:20 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 10 to 10.10.175.48:33590
20/05/13 23:36:20 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 10 to 10.10.162.50:55798
20/05/13 23:36:20 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 10 to 10.10.174.108:42382
20/05/13 23:36:23 INFO TaskSetManager: Starting task 188.0 in stage 1214.0 (TID 22247, ip-10-10-175-231.eu-west-1.compute.internal, executor 3, partition 188, PROCESS_LOCAL, 8073 bytes)
20/05/13 23:36:23 INFO TaskSetManager: Finished task 95.0 in stage 1214.0 (TID 22154) in 4006 ms on ip-10-10-175-231.eu-west-1.compute.internal (executor 3) (1/200)
20/05/13 23:36:23 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
java.lang.ClassCastException: java.util.Collections$SynchronizedSet cannot be cast to java.util.List
    at org.apache.spark.util.JsonProtocol$.accumValueToJson(JsonProtocol.scala:348)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson.apply(JsonProtocol.scala:324)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulableInfoToJson.apply(JsonProtocol.scala:324)
    at scala.Option.map(Option.scala:146)
    at org.apache.spark.util.JsonProtocol$.accumulableInfoToJson(JsonProtocol.scala:324)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson.apply(JsonProtocol.scala:317)
    at org.apache.spark.util.JsonProtocol$$anonfun$accumulablesToJson.apply(JsonProtocol.scala:317)
    at scala.collection.immutable.List.map(List.scala:288)
    at org.apache.spark.util.JsonProtocol$.accumulablesToJson(JsonProtocol.scala:317)
    at org.apache.spark.util.JsonProtocol$.taskInfoToJson(JsonProtocol.scala:309)
    at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:149)
    at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:76)
    at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:138)
    at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:158)
    at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
    at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:91)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$super$postToAll(AsyncEventQueue.scala:92)
    at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch.apply$mcJ$sp(AsyncEventQueue.scala:92)
    at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch.apply(AsyncEventQueue.scala:87)
    at org.apache.spark.scheduler.AsyncEventQueue$$anonfun$org$apache$spark$scheduler$AsyncEventQueue$$dispatch.apply(AsyncEventQueue.scala:87)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
    at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:87)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon$$anonfun$run.apply$mcV$sp(AsyncEventQueue.scala:83)
    at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1302)
    at org.apache.spark.scheduler.AsyncEventQueue$$anon.run(AsyncEventQueue.scala:82)
20/05/13 23:36:24 INFO TaskSetManager: Starting task 189.0 in stage 1214.0 (TID 22248, ip-10-10-175-231.eu-west-1.compute.internal, executor 19, partition 189, PROCESS_LOCAL, 8073 bytes)
20/05/13 23:36:24 INFO TaskSetManager: Finished task 39.0 in stage 1214.0 (TID 22098) in 4276 ms on ip-10-10-175-231.eu-west-1.compute.internal (executor 19) (2/200)

注: 我注意到当我们第一次加载 delta table 时并没有发生这些异常,因为在初始化加载中我们显然没有使用 delta lake io 的 .merge 功能。因此,这使我相信它与合并操作期间记录内容时的某些内容有关。但同样,这似乎不会影响任何结果,因为结果符合预期。

如果有人对这种行为有想法,那么最好检查一下这在 delta lake io 0.6.0 中是否是一个问题。 谢谢!

此错误不会影响您的任何作业,但当您查看 Spark History Server 上的 Spark UI 时它可能会影响调试:您可能会看到一个本应完成的活动阶段。

此问题将在 Apache Spark 2.4.7/3.0.1/3.1.0 中修复。有关此问题的更多详细信息,请查看以下链接: