具有翻滚 window 延迟和重复数据的 Spark 结构化流
Spark structured stream with tumbling window delayed and duplicate data
我正在尝试从 kafka 主题中读取数据,通过翻滚 window 聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。
我遇到的问题是
- 接收器上 window 的发送数据和接收聚合记录之间的长时间延迟(预期触发器应该触发后几分钟)
- 先前 window 聚合中的重复记录出现在后续 windows
中
为什么延迟这么长,我可以做些什么来减少它?
为什么会出现之前 windows 的重复记录,我该如何删除它们?
延迟似乎特别糟糕,因为 window 变得更短 - 当我将 window 持续时间设置为 10 秒时,它是 3 分钟以上,当 [=74] 时,大约是 2 分钟=] 持续时间设置为 60 秒。
在最短的 window 次中,我还看到记录“聚集在一起”,因此当接收器接收到记录时,我一次会收到几个 windows 的记录。
在重复的聚合记录上,我确实将输出模式设置为完成,但我的理解是记录应该只在当前 window 中重复,假设触发器在其中多次触发,而我的不应该是。
我有一个处理触发器设置匹配 window 时间和 10%(1 或 6 秒)的水印阈值,我知道如果我删除翻滚 window.
我明白为什么 spark 可能无法达到特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。
60秒翻滚发送数据示例window处理时间触发
- 发送 6 个负载
- 等一下
- 发送 1 个负载
- 稍等
- 发送 3 个 payload
(CreateTime 来自带有 --属性 print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和 window.
触发后几分钟到达
// First record fine
CreateTime:1644329432464 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
// Duplicate of first record with second sent by spark even though the window is over
CreateTime:1644329505265 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644329523901 {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
// Duplicate of first 2 records with third
CreateTime:1644330082974 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644330105990 {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
CreateTime:1644330125375 {"window":{"start":"2022-02-08T14:20:00.000Z","end":"2022-02-08T14:21:00.000Z"},"account_id":"acc0","totalAmount":333}
我有时会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息指示问题:
2022-02-08 14:24:45 WARN ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 99770 milliseconds
申请data/code
示例数据如下所示,由 python 脚本生成,ts 设置为当前时间:
{"account_id": "acc0", "txn_id": "1234500001", "amount": 111, "ts": 1644258052800}
应用程序代码(运行 内嵌 spark.master=local[*])
public void execute() throws Exception {
final SparkSession spark = SparkSession.builder().appName("test").getOrCreate();
final Trigger trigger = Trigger.ProcessingTime(60000);
final OutputMode outputMode = OutputMode.Complete();
final String windowDuration = "60 seconds";
final String watermarkThreshold = "6 seconds";
final String kafkaHost = "localhost:9092";
final String kafkaInTopic = "topic-in";
final String kafkaOutTopic = "topic-out";
final StructType schema = new StructType(
new StructField[] {
new StructField("account_id", DataTypes.StringType, false, null),
new StructField("txn_id", DataTypes.StringType, false, null),
new StructField("amount", DataTypes.LongType, false, null),
new StructField("ts", DataTypes.LongType, false, null)
}
);
final Dataset<Row> in = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaHost)
.option("subscribe", kafkaInTopic)
.option("startingOffsets", "latest")
.option("includeHeaders", "true")
.load();
in.printSchema();
final Dataset<Row> process = in
// convert kafka value payload to structured data
.withColumn("value", functions.from_json(functions.column("value").cast(DataTypes.StringType), schema))
.withColumn("account_id", functions.column("value.account_id"))
.withColumn("txn_id", functions.column("value.txn_id"))
.withColumn("amount", functions.column("value.amount"))
.withColumn("ts", functions.column("value.ts"))
// conversion to timestamp is by second, not ms
.withColumn("datetime", functions.col("ts").divide(1000).cast(DataTypes.TimestampType))
.withWatermark("datetime", watermarkThreshold)
.groupBy(
functions.window(functions.col("datetime"), windowDuration),
functions.col("account_id")
)
.agg(functions.sum("amount").as("totalAmount"));
process.printSchema();
final DataStreamWriter<Row> out = process
// required for kafka output
.select(functions.to_json(functions.struct("*")).as("value"))
.writeStream()
.outputMode(outputMode)
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", kafkaHost)
.option("topic", kafkaOutTopic)
.option("checkpointLocation", "/tmp/spark-kafka");
LOGGER.info("STARTING STREAM");
out.start().awaitTermination();
}
对于长时间延迟,可能是由于没有足够的资源来根据警告消息处理消息。您可以检查 spark UI 以了解原因。可能是分区之间的数据倾斜或需要更多内存或内核。
对于重复的记录,您可能想尝试update
或append
模式。 Complete
模式表示每次触发后将整个 Result Table 输出到 sink。这就是为什么你有重复项的原因。可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
我正在尝试从 kafka 主题中读取数据,通过翻滚 window 聚合一些数据并将其写入接收器(我一直在尝试使用 Kafka 和控制台)。
我遇到的问题是
- 接收器上 window 的发送数据和接收聚合记录之间的长时间延迟(预期触发器应该触发后几分钟)
- 先前 window 聚合中的重复记录出现在后续 windows 中
为什么延迟这么长,我可以做些什么来减少它?
为什么会出现之前 windows 的重复记录,我该如何删除它们?
延迟似乎特别糟糕,因为 window 变得更短 - 当我将 window 持续时间设置为 10 秒时,它是 3 分钟以上,当 [=74] 时,大约是 2 分钟=] 持续时间设置为 60 秒。
在最短的 window 次中,我还看到记录“聚集在一起”,因此当接收器接收到记录时,我一次会收到几个 windows 的记录。
在重复的聚合记录上,我确实将输出模式设置为完成,但我的理解是记录应该只在当前 window 中重复,假设触发器在其中多次触发,而我的不应该是。
我有一个处理触发器设置匹配 window 时间和 10%(1 或 6 秒)的水印阈值,我知道如果我删除翻滚 window.
我明白为什么 spark 可能无法达到特定频率的触发器,但我认为 10 秒,当然 60 秒足以处理我正在测试的非常有限的数据量。
60秒翻滚发送数据示例window处理时间触发
- 发送 6 个负载
- 等一下
- 发送 1 个负载
- 稍等
- 发送 3 个 payload
(CreateTime 来自带有 --属性 print.timestamp=true 的 kafka-console-consumer)。这些在我期望触发器根据 CreateTime 时间戳和 window.
触发后几分钟到达// First record fine
CreateTime:1644329432464 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
// Duplicate of first record with second sent by spark even though the window is over
CreateTime:1644329505265 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644329523901 {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
// Duplicate of first 2 records with third
CreateTime:1644330082974 {"window":{"start":"2022-02-08T14:09:00.000Z","end":"2022-02-08T14:10:00.000Z"},"account_id":"acc0","totalAmount":666}
CreateTime:1644330105990 {"window":{"start":"2022-02-08T14:10:00.000Z","end":"2022-02-08T14:11:00.000Z"},"account_id":"acc0","totalAmount":111}
CreateTime:1644330125375 {"window":{"start":"2022-02-08T14:20:00.000Z","end":"2022-02-08T14:21:00.000Z"},"account_id":"acc0","totalAmount":333}
我有时会看到如下消息,但没有其他 WARN 或 ERROR 级别的消息指示问题:
2022-02-08 14:24:45 WARN ProcessingTimeExecutor:69 - Current batch is falling behind. The trigger interval is 60000 milliseconds, but spent 99770 milliseconds
申请data/code
示例数据如下所示,由 python 脚本生成,ts 设置为当前时间:
{"account_id": "acc0", "txn_id": "1234500001", "amount": 111, "ts": 1644258052800}
应用程序代码(运行 内嵌 spark.master=local[*])
public void execute() throws Exception {
final SparkSession spark = SparkSession.builder().appName("test").getOrCreate();
final Trigger trigger = Trigger.ProcessingTime(60000);
final OutputMode outputMode = OutputMode.Complete();
final String windowDuration = "60 seconds";
final String watermarkThreshold = "6 seconds";
final String kafkaHost = "localhost:9092";
final String kafkaInTopic = "topic-in";
final String kafkaOutTopic = "topic-out";
final StructType schema = new StructType(
new StructField[] {
new StructField("account_id", DataTypes.StringType, false, null),
new StructField("txn_id", DataTypes.StringType, false, null),
new StructField("amount", DataTypes.LongType, false, null),
new StructField("ts", DataTypes.LongType, false, null)
}
);
final Dataset<Row> in = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaHost)
.option("subscribe", kafkaInTopic)
.option("startingOffsets", "latest")
.option("includeHeaders", "true")
.load();
in.printSchema();
final Dataset<Row> process = in
// convert kafka value payload to structured data
.withColumn("value", functions.from_json(functions.column("value").cast(DataTypes.StringType), schema))
.withColumn("account_id", functions.column("value.account_id"))
.withColumn("txn_id", functions.column("value.txn_id"))
.withColumn("amount", functions.column("value.amount"))
.withColumn("ts", functions.column("value.ts"))
// conversion to timestamp is by second, not ms
.withColumn("datetime", functions.col("ts").divide(1000).cast(DataTypes.TimestampType))
.withWatermark("datetime", watermarkThreshold)
.groupBy(
functions.window(functions.col("datetime"), windowDuration),
functions.col("account_id")
)
.agg(functions.sum("amount").as("totalAmount"));
process.printSchema();
final DataStreamWriter<Row> out = process
// required for kafka output
.select(functions.to_json(functions.struct("*")).as("value"))
.writeStream()
.outputMode(outputMode)
.trigger(trigger)
.format("kafka")
.option("kafka.bootstrap.servers", kafkaHost)
.option("topic", kafkaOutTopic)
.option("checkpointLocation", "/tmp/spark-kafka");
LOGGER.info("STARTING STREAM");
out.start().awaitTermination();
}
对于长时间延迟,可能是由于没有足够的资源来根据警告消息处理消息。您可以检查 spark UI 以了解原因。可能是分区之间的数据倾斜或需要更多内存或内核。
对于重复的记录,您可能想尝试update
或append
模式。 Complete
模式表示每次触发后将整个 Result Table 输出到 sink。这就是为什么你有重复项的原因。可以参考https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes