Spark structured streaming exactly once - 未实现 - 重复事件

Spark structured streaming exactly once - Not achieved - Duplicated events

我正在使用 Spark Structured Streaming 来消费来自 Kafka 的事件并将它们上传到 S3。

检查点在 S3 上提交:

DataFrameWriter<Row> writer = input.writeStream()
           .format("orc")
           .trigger(ProcessingTime(config.getProcessingTime()))
           .outputMode(OutputMode.Append())
           .option("truncate", false)           
           .option("checkpointLocation", "s3://bucket1")
           .option("compression", "zlib")
           .option("path", "s3://bucket2");

偏移量通过 StreamingQueryListener 提交给 Kafka :

  kafkaConsumer.commitSync(topicPartitionMap);

应用程序启动后,它会从 Kafka 检索偏移映射并启动流:

 reader = sparkSession
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
            .option("subscribe", "topic1")
            .option("max.poll.records", 1000)
            .option("failOnDataLoss", false)
            .option("startingOffsets", topicPartitionMap)

我将 topic/partition/offset 与数据一起存储在 ORC 文件中。

数据包含多个重复的事件 topic/partition/offset

应该如何配置流来实现恰好一次处理?

发现那些参数应该设置为true spark.streaming.driver.writeAheadLog.closeFileAfterWritespark.streaming.receiver.writeAheadLog.closeFileAfterWrite

Set this to 'true' when you want to use S3 for the metadata WAL

https://spark.apache.org/docs/latest/configuration.html

这里有更多详细信息: https://www.waitingforcode.com/apache-spark-streaming/spark-streaming-configuration/read?fbclid=IwAR17x1AfTLH1pjq1QPkDsQT6DU4hgi7WNeIYUnw25Hvquoj-4yQU10R0GeM