结构化流:水印与恰好一次语义

Structured streaming : watermark vs. exactly-once semantics

编程指南说结构化流使用适当的 sources/sinks.

保证端到端恰好一次语义

但是当作业崩溃并且我们应用了水印时,我不明白这是如何工作的。

以下是我目前设想的工作方式示例,如有任何误解,请指正。提前致谢!

示例:

Spark 作业:每 1 小时计数 # 个事件window,带有 1 小时水印。

消息:

我们启动作业,从源中读取 A、B、C,作业在 10:30am 处崩溃,然后我们才将它们写入接收器。

下午 6 点,作业恢复并知道使用保存的 checkpoint/WAL 重新处理 A、B、C。上午 10-11 点的最终计数为 3 window。

接下来,它并行读取来自Kafka、X、Y、Z的新消息,因为它们属于不同的分区。 Z 首先被处理,所以最大事件时间戳被设置为晚上 8 点。当作业读取 X 和 Y 时,它们现在位于水印之后(晚上 8 点 - 1 小时 = 晚上 7 点),因此它们作为旧数据被丢弃。晚上 8 点到 9 点的最终计数为 1,而工作在下午 12 点到 1 点 window 没有报告任何内容。我们丢失了 X 和 Y 的数据。

---例子结束---

这个场景准确吗? 如果是这样,当从 Kafka-Sspark 正常流动时,1 小时的水印可能足以处理 late/out-of-order 数据,但当 spark 作业进入 down/Kafka 连接丢失很长一段时间时则不够。避免数据丢失的唯一选择是使用比您预期的工作停止时间更长的水印吗?

Z is processed first, so the max event timestamp gets set to 8pm.

没错。尽管 Z 可能首先被计算,但水印是从当前查询迭代中的最大时间戳中减去的。这意味着 08:00 PM 将被设置为我们减去水印时间的时间,这意味着 12:00 和 12:50 将被丢弃。

From the documentation:

For a specific window starting at time T, the engine will maintain state and allow late data to update the state until (max event time seen by the engine - late threshold > T)


Would the only option to avoid data loss be to use a watermark longer than you expect the job to ever go down for

不一定。假设您将每个 Kafka 查询要读取的最大数据量设置为 100 个项目。如果您读取小批量,并且您正在从每个分区连续读取,则每个批的每个最大时间戳可能不是代理中最新消息的最大时间,这意味着您不会丢失这些消息。

水印在小批量期间是固定值。在您的示例中,由于 X、Y 和 Z 在同一个小批量中处理,因此用于此记录的水印将为 9:20am。完成后,小批量水印将更新到晚上 7 点。

下面来自实现水印功能的 design doc for the feature SPARK-18124 的引用:

To calculate the drop boundary in our trigger based execution, we have to do the following.

  • In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
  • After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger) - threshold

大概模拟会更说明:

import org.apache.hadoop.fs.Path
import java.sql.Timestamp
import org.apache.spark.sql.types._
import org.apache.spark.sql.streaming.ProcessingTime

val dir = new Path("/tmp/test-structured-streaming")
val fs = dir.getFileSystem(sc.hadoopConfiguration)
fs.mkdirs(dir)

val schema = StructType(StructField("vilue", StringType) ::
                        StructField("timestamp", TimestampType) ::
                        Nil)

val eventStream = spark
  .readStream
  .option("sep", ";")
  .option("header", "false")
  .schema(schema)
  .csv(dir.toString)

// Watermarked aggregation
val eventsCount = eventStream
  .withWatermark("timestamp", "1 hour")
  .groupBy(window($"timestamp", "1 hour"))
  .count

def writeFile(path: Path, data: String) {
  val file = fs.create(path)
  file.writeUTF(data)
  file.close()
}

// Debug query
val query = eventsCount.writeStream
  .format("console")
  .outputMode("complete")
  .option("truncate", "false")
  .trigger(ProcessingTime("5 seconds"))
  .start()

writeFile(new Path(dir, "file1"), """
  |A;2017-08-09 10:00:00
  |B;2017-08-09 10:10:00
  |C;2017-08-09 10:20:00""".stripMargin)

query.processAllAvailable()
val lp1 = query.lastProgress

// -------------------------------------------
// Batch: 0
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// +---------------------------------------------+-----+

// lp1: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T10:10:00.000Z",
//     "max" : "2017-08-09T10:20:00.000Z",
//     "min" : "2017-08-09T10:00:00.000Z",
//     "watermark" : "1970-01-01T00:00:00.000Z"
//   },
//   ...
// }


writeFile(new Path(dir, "file2"), """
  |Z;2017-08-09 20:00:00
  |X;2017-08-09 12:00:00
  |Y;2017-08-09 12:50:00""".stripMargin)

query.processAllAvailable()
val lp2 = query.lastProgress

// -------------------------------------------
// Batch: 1
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+
  
// lp2: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 3,
//   "eventTime" : {
//     "avg" : "2017-08-09T14:56:40.000Z",
//     "max" : "2017-08-09T20:00:00.000Z",
//     "min" : "2017-08-09T12:00:00.000Z",
//     "watermark" : "2017-08-09T09:20:00.000Z"
//   },
//   "stateOperators" : [ {
//     "numRowsTotal" : 3,
//     "numRowsUpdated" : 2
//   } ],
//   ...
// }

writeFile(new Path(dir, "file3"), "")

query.processAllAvailable()
val lp3 = query.lastProgress

// -------------------------------------------
// Batch: 2
// -------------------------------------------
// +---------------------------------------------+-----+
// |window                                       |count|
// +---------------------------------------------+-----+
// |[2017-08-09 10:00:00.0,2017-08-09 11:00:00.0]|3    |
// |[2017-08-09 12:00:00.0,2017-08-09 13:00:00.0]|2    |
// |[2017-08-09 20:00:00.0,2017-08-09 21:00:00.0]|1    |
// +---------------------------------------------+-----+
  
// lp3: org.apache.spark.sql.streaming.StreamingQueryProgress =
// {
//   ...
//   "numInputRows" : 0,
//   "eventTime" : {
//     "watermark" : "2017-08-09T19:00:00.000Z"
//   },
//   "stateOperators" : [ ],
//   ...
// }

query.stop()
fs.delete(dir, true)

注意批次 0 如何以水印 1970-01-01 00:00:00 开始,而批次 1 以水印 2017-08-09 09:20:00 开始(批次 0 的最大事件时间减去 1 小时)。第 2 批,虽然为空,但使用了水印 2017-08-09 19:00:00.