不了解结构化流中的更新模式和水印

Don't understand Update Mode and watermark in Structured Streaming

我有以下代码,它输出

number: 1, count: 1
number: 2, count: 1
number: 3, count: 2
number: 6, count: 2
number: 7, count: 1

我认为 number: 6, count: 2 不应该输出,因为事件低于水位线。但是我不明白为什么会输出

import java.sql.Timestamp

import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.{ForeachWriter, Row, SparkSession}

object UpdateModeWithWatermarkTest {
  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder()
      .appName("UpdateModeWithWatermarkTest")
      .config("spark.sql.shuffle.partitions", 1)
      .master("local[2]").getOrCreate()

    import spark.implicits._


    val inputStream = new MemoryStream[(Timestamp, Int)](1, spark.sqlContext)
    val now = 5000L

    val aggregatedStream = inputStream.toDS().toDF("created", "number")
      .withWatermark("created", "1 second")
      .groupBy("number")
      .count()

    val query = aggregatedStream.writeStream.outputMode("update")
      .foreach(new ForeachWriter[Row] {
        override def open(partitionId: Long, epochId: Long): Boolean = true

        override def process(value: Row): Unit = {
          println(s"number: ${value.getInt(0)}, count: ${value.getLong(1)}")
        }

        override def close(errorOrNull: Throwable): Unit = {}
      }).start()

    new Thread(new Runnable() {
      override def run(): Unit = {
        inputStream.addData(
          (new Timestamp(now + 5000), 1),
          (new Timestamp(now + 5000), 2),
          (new Timestamp(now + 5000), 3),
          (new Timestamp(now + 5000), 3)
        )
        while (!query.isActive) {
          Thread.sleep(50)
        }
        Thread.sleep(10000)

        // At this point, the water mark is (now  + 5000) - 1 second = 9 seconds
        // when adding following two events: (new Timestamp(4000L), 6),  (new Timestamp(now), 6)
        // These two events are below water mark, so that they should be discarded, then should not output number: 6, count: 2
        inputStream.addData((new Timestamp(4000L), 6))
        inputStream.addData(
          (new Timestamp(now), 6),
          (new Timestamp(11000), 7)
        )
      }
    }).start()

    query.awaitTermination(45000)


  }

}

其实没那么难

Watermark 允许在一段时间内使用 windows 考虑将迟到的数据包含在已经计算的结果中。它的前提是它跟踪到一个时间点,在该时间点之前假定没有更多的延迟事件应该到达,但如果它们到达,它们将被丢弃。

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#window-operations-on-event-time 上的优秀示例,附有漂亮的图表作为补充。

我想关于output mode of structured streaming的官方解释已经回答了你的问题。

Update mode - (Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.

在你的问题中,这意味着在 1 秒内到达的数据将更新字段 'count'