如何将 Spark Streaming Checkpoint Location 存储到 S3 中?

How to store Spark Streaming Checkpoint Location into S3?

我对一个 Spark Streaming 应用程序 (Spark v2.3.2) 很感兴趣,它可以获取 S3 镶木地板数据并将镶木地板数据写入 S3。应用程序的数据帧流利用 groupByKey()flatMapGroupsWithState() 来利用 GroupState.

是否可以将其配置为使用 s3 检查点位置?例如:

val stream = myDataset.writeStream
    .format("parquet")
    .option("path", s3DataDestination)
    .option("checkpointLocation", s3CheckpointPath)
    .option("truncate", false)
    .option(Trigger.Once)
    .outputMode(OutputMode.Append)
stream.start().awaitTermination()

我确认以上是能够成功写入数据到s3DataDestination.

但是写入s3检查点位置时抛出异常:

java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta

这是否需要自定义实现 S3 StateStoreProvider?或者,检查点位置是否需要写入HDFS?

问题是读写并发频率过高。 AWS S3 不提供此类功能。

解决方案:

  • 我们必须切换到本地挂载的永久磁盘以进行 Spark 检查指向
  • S3Guard : 这将使 S3 读写更加一致(注意:它是实验性的,我个人从未在实际中看到它)
  • 使用 HDFS