为什么 id 和 runId 每次开始流式查询都会改变?
Why do id and runId change every start of a streaming query?
结构化流中的每个流查询都与 id
和 runId
相关联。
为什么当我 stop
和 start
以下查询时它们会改变?
// Reading datasets with records from a Kafka topic
val idsPerBatch = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
withColumn("tokens", split('value, ",")).
withColumn("seconds", 'tokens(0) cast "long").
withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
withColumn("id", 'tokens(1)).
withColumn("batch", 'tokens(2) cast "int").
withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
groupBy($"event_time"). // <-- use event_time for grouping
agg(collect_list("batch") as "batches", collect_list("id") as "ids").
withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date
// start the query and display results to console
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = idsPerBatch.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(5.seconds)).
outputMode(OutputMode.Append). // <-- Append output mode
start
id
作为检查点元数据的一部分在运行中持久存在。
由于您正在使用 ConsoleSink
(即 console
输出),doesn't support checkpointing 且未提供检查点位置,因此无法从元数据中获取 id
文件(强调我的):
Returns the unique id of this query that persists across restarts
from checkpoint data. That is, this id is generated when a query is
started for the first time, and will be the same every time it is
restarted from checkpoint data
另一方面,每次重新启动查询时都会生成 runId
:
Returns the unique id of this run of the query. That is, every start/restart of a query will generated a unique runId
. Therefore, every time a query is restarted from checkpoint, it will have the same id but different runIds
.
结构化流中的每个流查询都与 id
和 runId
相关联。
为什么当我 stop
和 start
以下查询时它们会改变?
// Reading datasets with records from a Kafka topic
val idsPerBatch = spark.
readStream.
format("kafka").
option("subscribe", "topic1").
option("kafka.bootstrap.servers", "localhost:9092").
load.
withColumn("tokens", split('value, ",")).
withColumn("seconds", 'tokens(0) cast "long").
withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
withColumn("id", 'tokens(1)).
withColumn("batch", 'tokens(2) cast "int").
withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
groupBy($"event_time"). // <-- use event_time for grouping
agg(collect_list("batch") as "batches", collect_list("id") as "ids").
withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date
// start the query and display results to console
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = idsPerBatch.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(5.seconds)).
outputMode(OutputMode.Append). // <-- Append output mode
start
id
作为检查点元数据的一部分在运行中持久存在。
由于您正在使用 ConsoleSink
(即 console
输出),doesn't support checkpointing 且未提供检查点位置,因此无法从元数据中获取 id
文件(强调我的):
Returns the unique id of this query that persists across restarts from checkpoint data. That is, this id is generated when a query is started for the first time, and will be the same every time it is restarted from checkpoint data
另一方面,每次重新启动查询时都会生成 runId
:
Returns the unique id of this run of the query. That is, every start/restart of a query will generated a unique
runId
. Therefore, every time a query is restarted from checkpoint, it will have the same id but differentrunIds
.