结构化流 - 无法使用 FileContext API 管理 AWS S3 上的元数据日志文件
Structured Streaming - Could not use FileContext API for managing metadata log files on AWS S3
我在 Spark(v2.2.0) 中有一个 StreamingQuery,即
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("parquet")
.option("checkpointLocation", "s3n://bucket/checkpoint/test")
.option("path", "s3n://bucket/test")
.start()
当我 运行 query
时,数据会保存在 AWS S3 上,并在 s3n://bucket/checkpoint/test
处创建检查点。但是,我还在日志中收到以下 WARNING:
WARN [o.a.s.s.e.streaming.OffsetSeqLog] Could not use FileContext API for managing metadata log files at path s3n://bucket/checpoint/test/offsets. Using FileSystem API instead for managing log files. The log may be inconsistent under failures.
我无法理解为什么会出现此 警告。另外,如果出现任何故障,我的检查点是否会不一致?
谁能帮我解决一下?
查看源代码,这个错误来自HDFSMetadataLog class。代码中的注释指出:
Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing files in a directory always shows the latest files.
所以问题是由于使用 AWS S3,它会强制您使用 FileSystemManager
API。检查 class 的评论,我们看到
Implementation of FileManager using older FileSystem API. Note that this implementation cannot provide atomic renaming of paths, hence can lead to consistency issues. This should be used only as a backup option, when FileContextManager cannot be used.
因此,当多个编写器想要同时进行重命名操作时,可能会出现一些问题。有一个相关的工单 here,但是,由于无法在 Spark 中修复该问题,因此该工单已关闭。
如果需要在 S3 上设置检查点,需要考虑的一些事项:
- 为了避免警告和潜在的麻烦,检查点到 HDFS,然后复制结果
- 到 S3 的检查点,但检查点之间有很长的间隔。
- 没有人应该使用 S3n 作为连接器。它已过时并从 Hadoop 3 中删除。如果您有 Hadoop 2.7.x 类路径上的 JAR,请使用 s3a
- rename() 的问题不仅在于一致性,而且文件越大,花费的时间越长。
对象存储的检查点确实需要以不同的方式完成。如果仔细观察,没有 rename()
,但是现有的代码却期望它是一个 O(1) 原子操作。
我在 Spark(v2.2.0) 中有一个 StreamingQuery,即
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "test")
.load()
val query = df
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("parquet")
.option("checkpointLocation", "s3n://bucket/checkpoint/test")
.option("path", "s3n://bucket/test")
.start()
当我 运行 query
时,数据会保存在 AWS S3 上,并在 s3n://bucket/checkpoint/test
处创建检查点。但是,我还在日志中收到以下 WARNING:
WARN [o.a.s.s.e.streaming.OffsetSeqLog] Could not use FileContext API for managing metadata log files at path s3n://bucket/checpoint/test/offsets. Using FileSystem API instead for managing log files. The log may be inconsistent under failures.
我无法理解为什么会出现此 警告。另外,如果出现任何故障,我的检查点是否会不一致?
谁能帮我解决一下?
查看源代码,这个错误来自HDFSMetadataLog class。代码中的注释指出:
Note: [[HDFSMetadataLog]] doesn't support S3-like file systems as they don't guarantee listing files in a directory always shows the latest files.
所以问题是由于使用 AWS S3,它会强制您使用 FileSystemManager
API。检查 class 的评论,我们看到
Implementation of FileManager using older FileSystem API. Note that this implementation cannot provide atomic renaming of paths, hence can lead to consistency issues. This should be used only as a backup option, when FileContextManager cannot be used.
因此,当多个编写器想要同时进行重命名操作时,可能会出现一些问题。有一个相关的工单 here,但是,由于无法在 Spark 中修复该问题,因此该工单已关闭。
如果需要在 S3 上设置检查点,需要考虑的一些事项:
- 为了避免警告和潜在的麻烦,检查点到 HDFS,然后复制结果
- 到 S3 的检查点,但检查点之间有很长的间隔。
- 没有人应该使用 S3n 作为连接器。它已过时并从 Hadoop 3 中删除。如果您有 Hadoop 2.7.x 类路径上的 JAR,请使用 s3a
- rename() 的问题不仅在于一致性,而且文件越大,花费的时间越长。
对象存储的检查点确实需要以不同的方式完成。如果仔细观察,没有 rename()
,但是现有的代码却期望它是一个 O(1) 原子操作。