Spark Structured Streaming 在具有 Trigger.Once 的 Databricks 上显示没有输出
Spark Structured Streaming shows no output on Databricks with Trigger.Once
I ran this on Databricks CE in a notebook and it produces output to a
delta table. I am using .format("rate")
approach.
val streamingQuery = aggregatesDF.writeStream
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
Running this, however, produces no output! It stops after one
invocation, but the table remains empty.
原因是?
确定不是CE限制?错误?
- 这种处理方式不能运行一个单元格吗?
- 音量问题?
- 这就让人想到一个问题,难道只能在Databricks环境下使用Trigger Once吗?我假设我可以 运行 将其作为 Linux.
下的罐子
Could it be a bug, here goes:
import org.apache.spark.sql.streaming.Trigger
val streamingQuery = aggregatesDF.writeStream
.trigger(Trigger.Once())
.format("delta")
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
.format("rate")
,这可能是问题所在吗?这对于原型制作很方便。
Trigger.Once
不仅限于 Databricks - 它是 Spark Structured Streaming 的标准功能。但问题是它需要一个具有历史记录的数据源,因为它会触发自上次执行以来的数据处理,而 rate
源没有历史记录,总是从头开始。很容易显示:
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
.outputMode("append").save("1.parquet")
spark.read.parquet("1.parquet").show()
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+
如果您想继续使用 rate
进行试验,最好再创建一个 table 作为 rate
和您的代码之间的缓冲区。像这样:
# Create a buffer table
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
.format("delta").outputMode("append").save("1.delta")
# Use buffer table
bufferDF = spark.read.stream.format("delta").load("1.delta")
aggregatesDF = bufferDF....
streamingQuery = aggregatesDF.writeStream
.trigger(once=True)
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
P.S。将 .format("delta")
与 .foreachBatch
一起使用是没有意义的 - 后者优先。
I ran this on Databricks CE in a notebook and it produces output to a delta table. I am using
.format("rate")
approach.val streamingQuery = aggregatesDF.writeStream .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
Running this, however, produces no output! It stops after one invocation, but the table remains empty.
原因是?
确定不是CE限制?错误?
- 这种处理方式不能运行一个单元格吗?
- 音量问题?
- 这就让人想到一个问题,难道只能在Databricks环境下使用Trigger Once吗?我假设我可以 运行 将其作为 Linux. 下的罐子
Could it be a bug, here goes:
import org.apache.spark.sql.streaming.Trigger val streamingQuery = aggregatesDF.writeStream .trigger(Trigger.Once()) .format("delta") .foreachBatch(upsertToDelta _) .outputMode("update") .start()
.format("rate")
,这可能是问题所在吗?这对于原型制作很方便。
Trigger.Once
不仅限于 Databricks - 它是 Spark Structured Streaming 的标准功能。但问题是它需要一个具有历史记录的数据源,因为它会触发自上次执行以来的数据处理,而 rate
源没有历史记录,总是从头开始。很容易显示:
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "1.cp") \
.outputMode("append").save("1.parquet")
spark.read.parquet("1.parquet").show()
+---------+-----+
|timestamp|value|
+---------+-----+
+---------+-----+
如果您想继续使用 rate
进行试验,最好再创建一个 table 作为 rate
和您的代码之间的缓冲区。像这样:
# Create a buffer table
df = spark.readStream.format("rate").load()
df.writeStream.trigger(once=True).option("checkpointLocation", "buffer.cp") \
.format("delta").outputMode("append").save("1.delta")
# Use buffer table
bufferDF = spark.read.stream.format("delta").load("1.delta")
aggregatesDF = bufferDF....
streamingQuery = aggregatesDF.writeStream
.trigger(once=True)
.foreachBatch(upsertToDelta _)
.outputMode("update")
.start()
P.S。将 .format("delta")
与 .foreachBatch
一起使用是没有意义的 - 后者优先。