为什么 id 和 runId 每次开始流式查询都会改变?

Why do id and runId change every start of a streaming query?

结构化流中的每个流查询都与 idrunId 相关联。

为什么当我 stopstart 以下查询时它们会改变?

// Reading datasets with records from a Kafka topic
val idsPerBatch = spark.
  readStream.
  format("kafka").
  option("subscribe", "topic1").
  option("kafka.bootstrap.servers", "localhost:9092").
  load.
  withColumn("tokens", split('value, ",")).
  withColumn("seconds", 'tokens(0) cast "long").
  withColumn("event_time", to_timestamp(from_unixtime('seconds))). // <-- Event time has to be a timestamp
  withColumn("id", 'tokens(1)).
  withColumn("batch", 'tokens(2) cast "int").
  withWatermark(eventTime = "event_time", delayThreshold = "10 seconds"). // <-- define watermark (before groupBy!)
  groupBy($"event_time"). // <-- use event_time for grouping
  agg(collect_list("batch") as "batches", collect_list("id") as "ids").
  withColumn("event_time", to_timestamp($"event_time")) // <-- convert to human-readable date

// start the query and display results to console
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = idsPerBatch.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(5.seconds)).
  outputMode(OutputMode.Append). // <-- Append output mode
  start

id 作为检查点元数据的一部分在运行中持久存在。

由于您正在使用 ConsoleSink(即 console 输出),doesn't support checkpointing 且未提供检查点位置,因此无法从元数据中获取 id文件(强调我的):

Returns the unique id of this query that persists across restarts from checkpoint data. That is, this id is generated when a query is started for the first time, and will be the same every time it is restarted from checkpoint data

另一方面,每次重新启动查询时都会生成 runId

Returns the unique id of this run of the query. That is, every start/restart of a query will generated a unique runId. Therefore, every time a query is restarted from checkpoint, it will have the same id but different runIds.