具有翻滚 window 延迟和重复数据的 Spark 结构化流

Spark structured stream with tumbling window delayed and duplicate data

我正在尝试从 kafka 主题中读取数据,通过翻滚 window 聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。

我遇到的问题是

为什么延迟这么长,我可以做些什么来减少它?

为什么会出现之前 windows 的重复记录,我该如何删除它们?

延迟似乎特别糟糕,因为 window 变得更短 - 当我将 window 持续时间设置为 10 秒时,它是 3 分钟以上,当 [=74] 时,大约是 2 分钟=] 持续时间设置为 60 秒。

在最短的 window 次中,我还看到记录“聚集在一起”,因此当接收器接收到记录时,我一次会收到几个 windows 的记录。

在重复的聚合记录上,我确实将输出模式设置为完成,但我的理解是记录应该只在当前 window 中重复,假设触发器在其中多次触发,而我的不应该是。

我有一个处理触发器设置匹配 window 时间和 10%(1 或 6 秒)的水印阈值,我知道如果我删除翻滚 window.

我明白为什么 spark 可能无法达到特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。

60秒翻滚发送数据示例window处理时间触发

(CreateTime 来自带有 --属性 print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和 window.

触发后几分钟到达
// First record fine
CreateTime:1644329432464        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}

// Duplicate of first record with second sent by spark even though the window is over
CreateTime:1644329505265        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644329523901        {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}

// Duplicate of first 2 records with third
CreateTime:1644330082974        {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644330105990        {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
CreateTime:1644330125375        {"window":{"start":"2022-02-08T14:20:00.000Z","end":"2022-02-08T14:21:00.000Z"},"account_id":"acc0","totalAmount":333}

我有时会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息指示问题:

2022-02-08 14:24:45 WARN  ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 99770 milliseconds

申请data/code

示例数据如下所示,由 python 脚本生成,ts 设置为当前时间:

{"account_id": "acc0", "txn_id": "1234500001", "amount": 111, "ts": 1644258052800}

应用程序代码(运行 内嵌 spark.master=local[*])

    public void execute() throws Exception {
        final SparkSession spark = SparkSession.builder().appName("test").getOrCreate();

        final Trigger trigger = Trigger.ProcessingTime(60000);
        final OutputMode outputMode = OutputMode.Complete();
        final String windowDuration = "60 seconds";
        final String watermarkThreshold = "6 seconds";

        final String kafkaHost = "localhost:9092";
        final String kafkaInTopic = "topic-in";
        final String kafkaOutTopic = "topic-out";

        final StructType schema = new StructType(
            new StructField[] {
                new StructField("account_id", DataTypes.StringType, false, null),
                new StructField("txn_id", DataTypes.StringType, false, null),
                new StructField("amount", DataTypes.LongType, false, null),
                new StructField("ts", DataTypes.LongType, false, null)
            }
        );

        final Dataset<Row> in = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaHost)
            .option("subscribe", kafkaInTopic)
            .option("startingOffsets", "latest")
            .option("includeHeaders", "true")
            .load();

        in.printSchema();

        final Dataset<Row> process = in
            // convert kafka value payload to structured data
            .withColumn("value", functions.from_json(functions.column("value").cast(DataTypes.StringType), schema))

            .withColumn("account_id", functions.column("value.account_id"))
            .withColumn("txn_id", functions.column("value.txn_id"))
            .withColumn("amount", functions.column("value.amount"))
            .withColumn("ts", functions.column("value.ts"))

            // conversion to timestamp is by second, not ms
            .withColumn("datetime", functions.col("ts").divide(1000).cast(DataTypes.TimestampType))
            .withWatermark("datetime", watermarkThreshold)

            .groupBy(
                functions.window(functions.col("datetime"), windowDuration),
                functions.col("account_id")
            )
            .agg(functions.sum("amount").as("totalAmount"));


        process.printSchema();

        final DataStreamWriter<Row> out = process
            // required for kafka output
            .select(functions.to_json(functions.struct("*")).as("value"))

            .writeStream()
            .outputMode(outputMode)
            .trigger(trigger)
            .format("kafka")
            .option("kafka.bootstrap.servers", kafkaHost)
            .option("topic", kafkaOutTopic)
            .option("checkpointLocation", "/tmp/spark-kafka");

        LOGGER.info("STARTING STREAM");
        out.start().awaitTermination();
    }

对于长时间延迟,可能是由于没有足够的资源来根据警告消息处理消息。您可以检查 spark UI 以了解原因。可能是分区之间的数据倾斜或需要更多内存或内核。

对于重复的记录,您可能想尝试updateappend 模式。 Complete 模式表示每次触发后将整个 Result Table 输出到 sink。这就是为什么你有重复项的原因。可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes