一段时间后停止 Spark Session - Pyspark

Stop Spark Session after some time - Pyspark

我正在用 spark 做一个 ETL,这有时会花费很多时间。我想在一定时间后优雅地关闭 spark 会话。

我正在 Pyspark 中编写我的代码。

try:
 df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
 spark.stop()

我想在上面的代码中的某个时间后停止 spark。

有没有办法在一段时间后优雅地关闭 spark 会话?

我建议使用官方 python Timer 优雅地停止 Spark 会话:

import threading

def timer_elapsed():
    print('Timer elapsed')
    if not sc._jsc.sc().isStopped():
      spark.stop()

# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()

try:
  df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
  print('Spark job finished successfully.')
except Exception as e:
  spark_timer.cancel() # stop timer, we don't need to wait if error occured
  if not sc._jsc.sc().isStopped():
    spark.stop()

注意:我们在两种情况下停止会话,如果时间已过或捕获到异常。在请求停止 Spark 上下文之前,我们使用 sc._jsc.sc().isStopped 检查上下文是否处于活动状态,它直接调用 Java API。