如何在 Spark-YARN 上设置每个任务的最大允许执行时间?

How can I set a maximum allowed execution time per task on Spark-YARN?

我有一个很长的 运行 PySpark 结构化流作业,它读取一个 Kafka 主题,进行一些处理并将结果写回另一个 Kafka 主题。我们的 Kafka 服务器运行在另一个集群上。

它 运行 很好,但每隔几个小时它就会 冻结 ,即使在网络中 UI YARN 应用程序仍然处于状态“运行”。检查日志后,似乎是由于与 Kafka 源的一些暂时性连接问题。事实上,有问题的微批处理的所有任务都已正确完成,除了一个显示:

21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Discovered coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) for group spark-kafka-source-yyy
21/08/11 19:19:59 INFO AbstractCoordinator: Marking the coordinator my-kafka-xxx:9093 (id: 2147483646 rack: null) dead for group spark-kafka-source-yyy

故障 未被 Spark 或 YARN 检测到,任务将永远运行(最多几天)并继续每秒打印 10-20 条此类错误消息。重新启动进程可解决问题。

在这种情况下是否有可能强制Spark任务(在YARN上)失败?然后它会自动重新启动,问题应该解决了。当然,任何其他恢复 Kafka 连接的方法也可以...

我知道可以根据可接受的最大内存消耗终止 YARN 容器,但在执行时间上还没有看到类似的情况。

我还没有找到使用 YARN 的解决方案,但找到了在 Pyspark 驱动程序中使用监控循环的解决方法。如果状态在 10 分钟内未更新,循环将定期检查状态并使流媒体应用程序失败

MAX_DURATION = 10*60 # in seconds

df:DataFrame = define_my_data_stream(params)
writer:DataStreamWriter = write_to_my_kafka(df)

qy = writer.start()

prevBatch = -1
while not spark.streams.awaitAnyTermination(defaultMaxDuration):
    lastBatch = qy.lastProgress['batchId']
    if lastBatch == prevBatch:
        qy.stop()
        print("Query stopped")
        raise RuntimeError("Query '"+(qy.name or "")+"' ("+qy.id+") has stalled")
    else:
        prevBatch = lastBatch

引发异常将使 Spark 应用程序失败。然后可以通过 YARN 管理此故障,并使用以下选项重新启动应用程序以进行 spark-submit:

--conf spark.yarn.maxAppAttempts=2 \
--conf spark.yarn.am.attemptFailuresValidityInterval=1h \
--conf spark.yarn.max.executor.failures=4 \
--conf spark.yarn.executor.failuresValidityInterval=1h \

它确实有效:检测到冻结并从检查点重新启动应用程序。但是只能重启一次,就好像我没有指定failuresValidityInterval参数一样。这是另一个问题,known issue of Spark...