Spark Structured Streaming - 此查询不支持从检查点位置恢复

Spark Structured Streaming - This query does not support recovering from checkpoint location

出于学习目的,我正尝试在检查点上做一些 experiment/test。

但我获得的选项有限,无法查看内部结构的工作情况。我正在尝试从套接字读取。

val lines: DataFrame = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 12345)
  .load()

并用它做一些我需要检查点的状态操作

Q1。使用检查点位置作为我的本地系统,它无法读回检查点并给出错误

This query does not support recovering from checkpoint location. Delete src/testC/offsets to start over.;

它每次都会创建一个新的检查点 运行 查询。如何将我的本地系统用作 testing/experimenting 目的的检查点?

(所以我选择了 hdfs)

Q2。当 hdfs 作为检查点时,它在我的本地系统而不是 hdfs 中创建检查点,如何使其成为 hdfs 的检查点? (顺便说一句,通过了 hdfs 配置)

df.writeStream
  .option("checkpointLocation","/mycheckLoc")
  .option("hdfs_url" -> "hdfs://localhost:9000/hdoop"),
  .option("web_hdfs_url" -> "webhdfs://localhost:9870/hdoop")

Q3。我们是否需要在每个 df.writeStream 选项中提供检查点,即我们也可以传入 spark.sparkContext.setCheckpointDir(checkpointLocation) 对吗?

您收到此错误“此查询不支持从检查点位置恢复”,因为 socket readStream 不是可重播源,因此不允许使用任何检查点。您需要确保在您的 writeStream.

中完全不使用选项 checkpointLocation

通常,您可以使用 file:///path/to/dirhdfs:///path/to/dir.

来区分本地文件系统和 hdfs 位置

确保您的应用程序用户拥有写入和读取这些位置的所有权限。此外,您可能更改了代码库,在这种情况下,应用程序无法从检查点文件中恢复。您可以在 Recovery Semantics after Changes in a Streaming Query 上的结构化流编程指南中阅读结构化流作业中的 允许不允许 更改。

为了让 Spark 知道您的 HDFS,您需要在 Spark 的类路径中包含两个 Hadoop 配置文件:

  • hdfs-site.xml 为 HDFS 客户端提供默认行为;和
  • core-site.xml 设置默认文件系统名称。

通常,它们存储在“/etc/hadoop/conf”中。要使这些文件对 Spark 可见,您需要将 $SPARK_HOME/spark-env.sh 中的 HADOOP_CONF_DIR 设置为包含配置文件的位置。

[出自《Spark - 权威指南》一书]

"Do we need to provide checkpoint in every df.writeStream options, i.e. We can also pass in spark.sparkContext.setCheckpointDir(checkpointLocation) right?"

从理论上讲,您可以为 SQLContext 中的所有查询集中设置检查点位置,但强烈建议为每个流设置唯一的检查点位置。 Structured Streaming in Production 上的 Databricks 博客说:

"This checkpoint location preserves all of the essential information that uniquely identifies a query. Hence, each query must have a different checkpoint location, and multiple queries should never have the same location.

"As a best practice, we recommend that you always specify the checkpointLocation option."