在 flink 1.13 中配置 RocksDB

Configure RocksDB in flink 1.13

我在 Flink 1.13 版本中阅读了 EmbeddedRocksDBStateBackend 但是有大小限制,所以我想保留我以前的 Flink 1.11 版本的当前配置,但重点是这种配置 RocksDB 的方式是弃用 (new RocksDBStateBackend("path", true);).

我已尝试使用 EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true)) 的新配置,但出现此错误:

java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.

从 Java 以编程方式为 flink 1.13 配置 RocksDB 状态后端的最佳方法是什么?

在 Flink 1.13 中,我们重新组织了状态后端,因为旧的方式导致了很多关于事情如何工作的误解。所以这两个问题是分离的:

  1. 你的工作状态存储在哪里(状态后端)。 (对于 RocksDB,应该配置为使用最快的可用本地磁盘。)
  2. 检查点的存储位置(检查点存储)。在大多数情况下,这应该是一个分布式文件系统。

对于旧的 API,RocksDB 涉及两个不同文件系统的事实被传递给 RocksDBStateBackend 构造函数的检查点路径掩盖了。因此,该配置已移至其他位置(见下文)。

这个 table 显示了旧状态后端和新状态后端之间的关系(结合检查点存储):

Legacy State Backend New State Backend + Checkpoint Storage
MemoryStateBackend HashMapStateBackend + JobManagerCheckpointStorage
FsStateBackend HashMapStateBackend + FileSystemCheckpointStorage
RocksDBStateBackend EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage

在您的情况下,您希望将 EmbeddedRocksDBStateBackendFileSystemCheckpointStorage 一起使用。您当前遇到的问题是您在 RocksDB 中使用内存检查点存储(JobManagerCheckpointStorage),这严重限制了可以检查点的状态。

您可以通过在flink-conf.yaml

中指定一个检查点目录来解决这个问题
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/

# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem

或在您的代码中

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

有关完整的详细信息,请参阅 Migrating from Legacy Backends 上的文档。