什么时候在 Apache Spark StreamingQueryListeners 中触发 onQueryTerminated?

When is onQueryTerminated triggered in Apache Spark StreamingQueryListeners?

我正在开发自定义 StreamingQueryListener,我想在测试中触发它的 onQueryTerminated 方法。

这是我尝试实现的:

import org.apache.spark.sql.{ SQLContext, SparkSession }
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{ col, to_date }
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.scalatest.flatspec.AnyFlatSpec

class MyListener extends StreamingQueryListener {
  override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit       = {}
  override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit     = {}
  override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = println(event.exception)
}

class ListenerSpec extends AnyFlatSpec {

  it should "trigger onQueryTerminated" in {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    spark.streams.addListener(new MyListener())
    implicit val sqlContext: SQLContext = spark.sqlContext

    import spark.implicits._

    val stream = MemoryStream[Int]
    stream.addData(Seq(1, 3, 4))

    val query = stream
      .toDF()
      .withColumn("columnDoesntExist", to_date(col("names")))
      .writeStream
      .format("console")
      .start()

    query.awaitTermination()
  }
}

但是,这不起作用,因为它引发了 AnalysisException,但流式查询的终止不会触发 onQueryTerminated 方法。

在什么情况下触发该方法并且event.exception是Some(异常)?

更新

以下代码成功触发了onQueryTerminated的执行:

val exceptionUdf = udf(() => throw new Exception())

val query = stream
      .toDF()
      .withColumn("exception", exceptionUdf())
      .writeStream
      .format("console")
      .start()

请参阅已接受的答案以了解原因。

根据“使用 Apache Spark 进行流处理”一书(由 O'Reilly 出版),onQueryTerminated 方法获取

"Called when a streaming query is stopped. The event contains id and runId fields that correlate with the start event. It also provides an exception field that contains an exception if the query failed due to an error."

当您收到 AnalysisException 时,您的查询甚至还没有开始。它只到达了 Catalyst 优化器四个阶段中的第一个阶段,即“分析”,它还没有转换成 运行 可用代码:

有关 Catalyst Optimizer 的更多详细信息。

AnalysisException 仅表示与目录相关的代码中存在问题,这正是您打算执行的操作:引用不存在的列(在目录中)。

如果您想 运行 执行 onQueryTermination 方法,您需要实现一个工作代码,但在它已经 运行ning 时失败了(例如,提供错误的数据输入类型)。