Spark Structured Streaming 批量读取检查点

Spark Structured Streaming Batch Read Checkpointing

我是 Spark 的新手,仍在学习中。我遇到的一个更困难的概念是检查点以及 Spark 如何使用它从故障中恢复。我正在使用结构化流从 Kafka 进行批量读取,并将它们作为 Parquet 文件写入 S3:

dataset
    .write()
    .mode(SaveMode.Append)
    .option("checkpointLocation", checkpointLocation)
    .partitionBy("date_hour")
    .parquet(getS3PathForTopic(topicName));

检查点位置是 S3 文件系统路径。但是,作为作业 运行s,我看不到检查点文件。在随后的 运行 秒中,我看到以下日志:

21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1

这表明之前的 运行 没有为这个 运行 挑选它们的检查点任何偏移量。所以它从最早的偏移量开始消耗。

如何让我的工作获得新的抵消?请注意,这是一个批量查询,如 here.

所述

我是这样读的:

             sparkSession
                .read()
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
                .option("subscribe", topic)
                .option("kafka.security.protocol", "SSL")
                .option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
                .option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
                .option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
                .option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
                .option("kafka.ssl.endpoint.identification.algorithm", "")
                .option("failOnDataLoss", "true");

我不确定为什么 batch Spark Structured Streaming with Kafka 现在仍然存在。如果您想使用它,那么您必须编写自己的 Offset management。看攻略,但是解释的很烂

我会说 Trigger.Once 对您来说是一个更好的用例; Offset management 由 Spark 提供,因此它不是批处理模式。