Kafka topic没有记录时为什么会打开StreamingQueryStatus.isDataAvailable?

Why does StreamingQueryStatus.isDataAvailable is turned on when no records are available in Kafka topic?

我必须停止 spark 结构化流式查询。所以首先,我检查状态是否有任何数据在 Kafka 中可供处理。如果是,处理它然后停止查询。

if (query.status.isDataAvailable) {
                    query.processAllAvailable()
                  }
query.stop()

根据 Jacek Laskowski 的建议,我修改了实现。 我有流式查询列表,

streamingQueryList.foreach{query =>
query.stop()
}

每次停止查询后,都会抛出以下异常:

java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:998)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153)
    at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:222)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:621)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply$mcV$sp(FileFormatWriter.scala:179)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:164)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write.apply(FileFormatWriter.scala:164)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:164)
    at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:123)
    at org.apache.spark.

出于调试目的,我打印了查询的 recentProgress 以了解 spark 正在读取的当前主题。我已经使用控制台消费者检查了主题,Kafka 中没有数据,但 query.status.isDataAvailable 打印为真。而且即使 recentProgress 为空,它仍然在等待数据处理。由于它无限等待。

我不确定为什么 isDataAvailable 会给出 true,但是 processAllAvailable 会为您提供完全符合您使用 if (query.status.isDataAvailable) 块的目标的解决方案。

processAllAvailable Blocks until all available data in the source has been processed and committed to the sink.

这就是您的用例,不是吗?

但是请注意 "This method is intended for testing.""this method may block forever"(引用 processAllAvailable 的 scaladoc) .

您应该使用 awaitTermination 来保持主线程阻塞(因此您的流式应用程序已启动并且 运行)并且 stop 您的查询(及其后台线程)来自另一个 "monitoring" 线程的便利。

awaitTermination() Waits for the termination of this query, either by query.stop() or by an exception. If the query has terminated with an exception, then the exception will be thrown.