如何将 Spark Streaming Checkpoint Location 存储到 S3 中?
How to store Spark Streaming Checkpoint Location into S3?
我对一个 Spark Streaming 应用程序 (Spark v2.3.2) 很感兴趣,它可以获取 S3 镶木地板数据并将镶木地板数据写入 S3。应用程序的数据帧流利用 groupByKey()
和 flatMapGroupsWithState()
来利用 GroupState
.
是否可以将其配置为使用 s3 检查点位置?例如:
val stream = myDataset.writeStream
.format("parquet")
.option("path", s3DataDestination)
.option("checkpointLocation", s3CheckpointPath)
.option("truncate", false)
.option(Trigger.Once)
.outputMode(OutputMode.Append)
stream.start().awaitTermination()
我确认以上是能够成功写入数据到s3DataDestination
.
但是写入s3检查点位置时抛出异常:
java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta
这是否需要自定义实现 S3 StateStoreProvider
?或者,检查点位置是否需要写入HDFS?
问题是读写并发频率过高。
AWS S3 不提供此类功能。
解决方案:
- 我们必须切换到本地挂载的永久磁盘以进行 Spark 检查指向
- S3Guard : 这将使 S3 读写更加一致(注意:它是实验性的,我个人从未在实际中看到它)
- 使用 HDFS
我对一个 Spark Streaming 应用程序 (Spark v2.3.2) 很感兴趣,它可以获取 S3 镶木地板数据并将镶木地板数据写入 S3。应用程序的数据帧流利用 groupByKey()
和 flatMapGroupsWithState()
来利用 GroupState
.
是否可以将其配置为使用 s3 检查点位置?例如:
val stream = myDataset.writeStream
.format("parquet")
.option("path", s3DataDestination)
.option("checkpointLocation", s3CheckpointPath)
.option("truncate", false)
.option(Trigger.Once)
.outputMode(OutputMode.Append)
stream.start().awaitTermination()
我确认以上是能够成功写入数据到s3DataDestination
.
但是写入s3检查点位置时抛出异常:
java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta
这是否需要自定义实现 S3 StateStoreProvider
?或者,检查点位置是否需要写入HDFS?
问题是读写并发频率过高。 AWS S3 不提供此类功能。
解决方案:
- 我们必须切换到本地挂载的永久磁盘以进行 Spark 检查指向
- S3Guard : 这将使 S3 读写更加一致(注意:它是实验性的,我个人从未在实际中看到它)
- 使用 HDFS