Spark Streaming 延迟写入 Kafka - x 分钟后

Spark Streaming write to Kafka with delay - after x minutes

我们有一个 spark Streaming 应用程序。 架构如下

Kinesis 到 Spark 到 Kafka。

Spark 应用程序正在使用 qubole/kinesis-sql 从 Kinesis 进行结构化流式处理。然后将数据聚合,然后推送到 Kafka。

我们的用例要求在推送到 Kafka 之前延迟 4 分钟。

windowing 完成了 2 分钟和 4 分钟的水印

val windowedCountsDF = messageDS
   .withWatermark("timestamp", "4 minutes")
   .groupBy(window($"timestamp", "2 minutes", "2 minutes"), $"id", $"eventType", $"topic")

每两分钟触发一次写入 Kafka

val eventFilteredQuery = windowedCountsDF
  .selectExpr("topic", "id as key", "to_json(struct(*)) AS value")
  .writeStream
  .trigger(Trigger.ProcessingTime("2 minutes"))
  .format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
  .option("checkpointLocation", checkPoint)
  .outputMode("update")
  .option("kafka.bootstrap.servers", kafkaBootstrapServers)
  .queryName("events_kafka_stream")
  .start()

我可以更改触发时间以匹配 window ,但仍有一些事件会立即推送到 kafka。

有什么方法可以在 window 完成后延迟写入 Kafka x 分钟。

谢谢

将输出模式从 update 更改为 append(默认选项)。 output 模式会将所有更新的行写入接收器,因此,是否使用水印并不重要。

但是,在 append 模式下,任何写入都需要等到水印被越过 - 这正是您想要的:

Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in withWatermark() as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed).