Apache Spark(结构化流):S3 检查点支持

Apache Spark (Structured Streaming) : S3 Checkpoint support

来自 spark 结构化流文档: "This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when starting a query."

果然,将检查点设置为 s3 路径会抛出:

17/01/31 21:23:56 ERROR ApplicationMaster: User class threw exception: java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
java.lang.IllegalArgumentException: Wrong FS: s3://xxxx/fact_checkpoints/metadata, expected: hdfs://xxxx:8020 
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:652) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:194) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.access[=10=]0(DistributedFileSystem.java:106) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1305) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.doCall(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) 
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1301) 
        at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1430) 
        at org.apache.spark.sql.execution.streaming.StreamMetadata$.read(StreamMetadata.scala:51) 
        at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:100) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:232) 
        at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:269) 
        at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:262) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor$.main(FactDeviceLogsProcessor.scala:133) 
        at com.roku.dea.spark.streaming.FactDeviceLogsProcessor.main(FactDeviceLogsProcessor.scala) 
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
        at java.lang.reflect.Method.invoke(Method.java:498) 
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon.run(ApplicationMaster.scala:637) 
17/01/31 21:23:56 INFO SparkContext: Invoking stop() from shutdown hook 

这里有几个问题:

  1. 为什么不支持 s3 作为检查点目录(常规 Spark 流支持此功能)?什么使文件系统 "HDFS compliant" ?
  2. 我临时使用 HDFS(因为集群随时可能出现或出现故障)并使用 s3 作为保存所有数据的地方 - 在这种设置中存储结构化流数据的检查点数据的建议是什么?

这是一个已知问题:https://issues.apache.org/jira/browse/SPARK-19407

应在下一版本中修复。您可以使用 --conf spark.hadoop.fs.defaultFS=s3 将默认文件系统设置为 s3 作为解决方法。

为什么 FS HDFS "compliant?" 它是一个文件系统,具有 Hadoop FS specification 中指定的行为。 object store 和 FS 的区别都在里面,重点是 "eventually consistent object stores without append or O(1) atomic renames are not compliant"

特别是对于 S3

  1. 不一致:创建新的 blob 后,列表命令通常不会显示它。删除相同。
  2. 当一个 blob 被覆盖或删除时,它可能需要一段时间才能消失
  3. rename()是先复制再删除的实现

通过将所有内容保存到一个位置然后将其重命名为检查点目录来激发流式检查点。这使得到检查点的时间与在 S3 中复制数据的时间成正比,即 ~6-10 MB/s。

当前的流代码位不适合 s3

现在,执行以下操作之一

  • 检查点到 HDFS,然后复制结果
  • 检查点到分配并附加到您的集群的一些 EBS
  • 到 S3 的检查点,但检查点之间有很长的间隔,因此到检查点的时间不会导致您的流式应用程序宕机。

如果您使用的是 EMR,则可以支付额外费用购买一致的、由 dynamo DB 支持的 S3,这会给您带来更好的一致性。但复制时间仍然相同,所以检查点将同样慢

此问题已在 https://issues.apache.org/jira/browse/SPARK-19407 中修复。

然而,由于 S3 中缺乏最终一致性,结构化流式检查点在 S3 中运行不佳。使用 S3 进行检查点不是一个好主意 https://issues.apache.org/jira/browse/SPARK-19013.

Micheal Armburst 表示这不会在 Spark 中修复,解决方案是等待 S3guard 实现。 S3Guard 有一段时间了。

编辑:自 post 完成后有 2 项发展 a) 对 S3Guard 的支持已合并到 Spark 3.0 中。 b) AWS 使 S3 立即一致。

您可以使用 s3 作为检查点,但您应该启用 EMRFS,以便处理 s3 一致性。

是的,如果您使用的是 Spark Structured Streaming 版本 3 或更高版本。首先,创建一个 SparkSession 并将 S3 配置添加到其上下文中。

val sparkSession = SparkSession
    .builder()
    .master(sparkMasterUrl)
    .appName(appName)
    .getOrCreate()

sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "accessKey")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "secretKey")
sparkSession.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "http://s3URL:s3Port")
sparkSession.sparkContext.hadoopConfiguration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

稍后,在开始查询之前添加 checkpointLocation 使用 S3 存储桶的配置。例如:

val streamingQuery = streamingDF.writeStream
    .option("checkpointLocation", "s3a://bucketName/checkpointDir/")
    .foreachBatch{(batchDF: DataFrame, batchId: Long) =>
       // Transform and write batchDF
     }.start()

streamingQuery.awaitTermination()