Spark structured streaming exactly once - 未实现 - 重复事件
Spark structured streaming exactly once - Not achieved - Duplicated events
我正在使用 Spark Structured Streaming 来消费来自 Kafka 的事件并将它们上传到 S3。
检查点在 S3 上提交:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
偏移量通过 StreamingQueryListener
提交给 Kafka :
kafkaConsumer.commitSync(topicPartitionMap);
应用程序启动后,它会从 Kafka 检索偏移映射并启动流:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
我将 topic/partition/offset
与数据一起存储在 ORC 文件中。
数据包含多个重复的事件 topic/partition/offset
。
应该如何配置流来实现恰好一次处理?
发现那些参数应该设置为true
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
Set this to 'true' when you want to use S3 for the metadata WAL
我正在使用 Spark Structured Streaming 来消费来自 Kafka 的事件并将它们上传到 S3。
检查点在 S3 上提交:
DataFrameWriter<Row> writer = input.writeStream()
.format("orc")
.trigger(ProcessingTime(config.getProcessingTime()))
.outputMode(OutputMode.Append())
.option("truncate", false)
.option("checkpointLocation", "s3://bucket1")
.option("compression", "zlib")
.option("path", "s3://bucket2");
偏移量通过 StreamingQueryListener
提交给 Kafka :
kafkaConsumer.commitSync(topicPartitionMap);
应用程序启动后,它会从 Kafka 检索偏移映射并启动流:
reader = sparkSession
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", config.getKafkaBootStrapServers())
.option("subscribe", "topic1")
.option("max.poll.records", 1000)
.option("failOnDataLoss", false)
.option("startingOffsets", topicPartitionMap)
我将 topic/partition/offset
与数据一起存储在 ORC 文件中。
数据包含多个重复的事件 topic/partition/offset
。
应该如何配置流来实现恰好一次处理?
发现那些参数应该设置为true
spark.streaming.driver.writeAheadLog.closeFileAfterWrite
和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
Set this to 'true' when you want to use S3 for the metadata WAL