Spark Structured Streaming 批量读取检查点
Spark Structured Streaming Batch Read Checkpointing
我是 Spark 的新手,仍在学习中。我遇到的一个更困难的概念是检查点以及 Spark 如何使用它从故障中恢复。我正在使用结构化流从 Kafka 进行批量读取,并将它们作为 Parquet 文件写入 S3:
dataset
.write()
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointLocation)
.partitionBy("date_hour")
.parquet(getS3PathForTopic(topicName));
检查点位置是 S3 文件系统路径。但是,作为作业 运行s,我看不到检查点文件。在随后的 运行 秒中,我看到以下日志:
21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1
这表明之前的 运行 没有为这个 运行 挑选它们的检查点任何偏移量。所以它从最早的偏移量开始消耗。
如何让我的工作获得新的抵消?请注意,这是一个批量查询,如 here.
所述
我是这样读的:
sparkSession
.read()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
.option("subscribe", topic)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
.option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
.option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
.option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("failOnDataLoss", "true");
我不确定为什么 batch
Spark Structured Streaming with Kafka 现在仍然存在。如果您想使用它,那么您必须编写自己的 Offset management
。看攻略,但是解释的很烂
我会说 Trigger.Once
对您来说是一个更好的用例; Offset management
由 Spark 提供,因此它不是批处理模式。
我是 Spark 的新手,仍在学习中。我遇到的一个更困难的概念是检查点以及 Spark 如何使用它从故障中恢复。我正在使用结构化流从 Kafka 进行批量读取,并将它们作为 Parquet 文件写入 S3:
dataset
.write()
.mode(SaveMode.Append)
.option("checkpointLocation", checkpointLocation)
.partitionBy("date_hour")
.parquet(getS3PathForTopic(topicName));
检查点位置是 S3 文件系统路径。但是,作为作业 运行s,我看不到检查点文件。在随后的 运行 秒中,我看到以下日志:
21/10/14 12:20:51 INFO ConsumerCoordinator: [Consumer clientId=consumer-spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0-5, groupId=spark-kafka-relation-54f0cc87-e437-4582-b998-a33189e90bd7-driver-0] Found no committed offset for partition topic-1
这表明之前的 运行 没有为这个 运行 挑选它们的检查点任何偏移量。所以它从最早的偏移量开始消耗。
如何让我的工作获得新的抵消?请注意,这是一个批量查询,如 here.
所述我是这样读的:
sparkSession
.read()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaProperties.bootstrapServers())
.option("subscribe", topic)
.option("kafka.security.protocol", "SSL")
.option("kafka.ssl.truststore.location", sslConfig.truststoreLocation())
.option("kakfa.ssl.truststore.password", sslConfig.truststorePassword())
.option("kafka.ssl.keystore.location", sslConfig.keystoreLocation())
.option("kafka.ssl.keystore.password", sslConfig.keystorePassword())
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("failOnDataLoss", "true");
我不确定为什么 batch
Spark Structured Streaming with Kafka 现在仍然存在。如果您想使用它,那么您必须编写自己的 Offset management
。看攻略,但是解释的很烂
我会说 Trigger.Once
对您来说是一个更好的用例; Offset management
由 Spark 提供,因此它不是批处理模式。