Spark Structured Streaming 在具有 Trigger.Once 的 Databricks 上显示没有输出

Spark Structured Streaming shows no output on Databricks with Trigger.Once

I ran this on Databricks CE in a notebook and it produces output to a delta table. I am using .format("rate") approach.

val streamingQuery = aggregatesDF.writeStream
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

Running this, however, produces no output! It stops after one invocation, but the table remains empty.

原因是?

确定不是CE限制?错误?

Could it be a bug, here goes:

import org.apache.spark.sql.streaming.Trigger

val streamingQuery = aggregatesDF.writeStream
  .trigger(Trigger.Once())
  .format("delta")
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

.format("rate"),这可能是问题所在吗?这对于原型制作很方便。

Trigger.Once 不仅限于 Databricks - 它是 Spark Structured Streaming 的标准功能。但问题是它需要一个具有历史记录的数据源,因为它会触发自上次执行以来的数据处理,而 rate 源没有历史记录,总是从头开始。很容易显示:

df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
  .outputMode("append").save("1.parquet")
spark.read.parquet("1.parquet").show()
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+

如果您想继续使用 rate 进行试验,最好再创建一个 table 作为 rate 和您的代码之间的缓冲区。像这样:

# Create a buffer table
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
  .format("delta").outputMode("append").save("1.delta")

# Use buffer table
bufferDF = spark.read.stream.format("delta").load("1.delta")
aggregatesDF = bufferDF....
streamingQuery = aggregatesDF.writeStream
  .trigger(once=True)
  .foreachBatch(upsertToDelta _)
  .outputMode("update")
  .start()

P.S。将 .format("delta").foreachBatch 一起使用是没有意义的 - 后者优先。