附加模式下水印聚合查询的空输出

Empty output for Watermarked Aggregation Query in Append Mode

我使用的是 Spark 2.2.0-rc1。

我有一个 Kafka topic,我正在查询一个 运行 水印聚合,带有 1 minute 水印,用 [=] 发送给 console 16=]输出模式。

import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
  readStream.
  format("kafka").
  option("kafka.bootstrap.servers", "localhost:9092").
  option("startingOffsets", "earliest").
  option("subscribe", "topic").
  load.
  select(from_json(col("value").cast("string"), schema).as("value"))
  select("value.*").
  withWatermark("time", "1 minute").
  groupBy("time").
  count.
  writeStream.
  outputMode("append").
  format("console").
  start

我正在 Kafka 中推送以下数据 topic:

{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}

我得到以下输出:

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+                                                                    
|time|count|
+----+-----+
+----+-----+

这是预期的行为吗?

将更多数据推送到 Kafka 应该会触发 Spark 输出一些东西。当前行为完全是内部实现造成的。

当您推送一些数据时,StreamingQuery 将生成一个批处理到 运行。当这批完成时,它会记住这批中的最大事件时间。然后在下一批, 因为您使用的是 append 模式,所以 StreamingQuery 将使用最大事件时间和水印从 StateStore 中驱逐旧值并将其输出。因此,您需要确保至少生成两个批次才能看到输出。

这是我的最佳猜测:

追加模式仅在水印过去后输出数据(例如,在本例中为 1 分钟后)。您没有设置触发器(例如 .trigger(Trigger.ProcessingTime("10 seconds")),因此默认情况下它会尽快输出批次。因此,第一分钟所有批次都应该是空的,一分钟后的第一个批次应该包含一些内容。

另一种可能是您使用的是 groupBy("time") 而不是 groupBy(window("time", "[window duration]"))。我相信水印应该与时间 windows 或 mapGroupsWithState 一起使用,所以我不知道在这种情况下交互是如何工作的。