附加模式下水印聚合查询的空输出
Empty output for Watermarked Aggregation Query in Append Mode
我使用的是 Spark 2.2.0-rc1。
我有一个 Kafka topic
,我正在查询一个 运行 水印聚合,带有 1 minute
水印,用 [=] 发送给 console
16=]输出模式。
import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest").
option("subscribe", "topic").
load.
select(from_json(col("value").cast("string"), schema).as("value"))
select("value.*").
withWatermark("time", "1 minute").
groupBy("time").
count.
writeStream.
outputMode("append").
format("console").
start
我正在 Kafka 中推送以下数据 topic
:
{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
我得到以下输出:
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
这是预期的行为吗?
将更多数据推送到 Kafka 应该会触发 Spark 输出一些东西。当前行为完全是内部实现造成的。
当您推送一些数据时,StreamingQuery 将生成一个批处理到 运行。当这批完成时,它会记住这批中的最大事件时间。然后在下一批,
因为您使用的是 append
模式,所以 StreamingQuery 将使用最大事件时间和水印从 StateStore 中驱逐旧值并将其输出。因此,您需要确保至少生成两个批次才能看到输出。
这是我的最佳猜测:
追加模式仅在水印过去后输出数据(例如,在本例中为 1 分钟后)。您没有设置触发器(例如 .trigger(Trigger.ProcessingTime("10 seconds")
),因此默认情况下它会尽快输出批次。因此,第一分钟所有批次都应该是空的,一分钟后的第一个批次应该包含一些内容。
另一种可能是您使用的是 groupBy("time")
而不是 groupBy(window("time", "[window duration]"))
。我相信水印应该与时间 windows 或 mapGroupsWithState 一起使用,所以我不知道在这种情况下交互是如何工作的。
我使用的是 Spark 2.2.0-rc1。
我有一个 Kafka topic
,我正在查询一个 运行 水印聚合,带有 1 minute
水印,用 [=] 发送给 console
16=]输出模式。
import org.apache.spark.sql.types._
val schema = StructType(StructField("time", TimestampType) :: Nil)
val q = spark.
readStream.
format("kafka").
option("kafka.bootstrap.servers", "localhost:9092").
option("startingOffsets", "earliest").
option("subscribe", "topic").
load.
select(from_json(col("value").cast("string"), schema).as("value"))
select("value.*").
withWatermark("time", "1 minute").
groupBy("time").
count.
writeStream.
outputMode("append").
format("console").
start
我正在 Kafka 中推送以下数据 topic
:
{"time":"2017-06-07 10:01:00.000"}
{"time":"2017-06-07 10:02:00.000"}
{"time":"2017-06-07 10:03:00.000"}
{"time":"2017-06-07 10:04:00.000"}
{"time":"2017-06-07 10:05:00.000"}
我得到以下输出:
scala> -------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----+
|time|count|
+----+-----+
+----+-----+
这是预期的行为吗?
将更多数据推送到 Kafka 应该会触发 Spark 输出一些东西。当前行为完全是内部实现造成的。
当您推送一些数据时,StreamingQuery 将生成一个批处理到 运行。当这批完成时,它会记住这批中的最大事件时间。然后在下一批,
因为您使用的是 append
模式,所以 StreamingQuery 将使用最大事件时间和水印从 StateStore 中驱逐旧值并将其输出。因此,您需要确保至少生成两个批次才能看到输出。
这是我的最佳猜测:
追加模式仅在水印过去后输出数据(例如,在本例中为 1 分钟后)。您没有设置触发器(例如 .trigger(Trigger.ProcessingTime("10 seconds")
),因此默认情况下它会尽快输出批次。因此,第一分钟所有批次都应该是空的,一分钟后的第一个批次应该包含一些内容。
另一种可能是您使用的是 groupBy("time")
而不是 groupBy(window("time", "[window duration]"))
。我相信水印应该与时间 windows 或 mapGroupsWithState 一起使用,所以我不知道在这种情况下交互是如何工作的。