在 flink 1.13 中配置 RocksDB
Configure RocksDB in flink 1.13
我在 Flink 1.13 版本中阅读了 EmbeddedRocksDBStateBackend
但是有大小限制,所以我想保留我以前的 Flink 1.11 版本的当前配置,但重点是这种配置 RocksDB 的方式是弃用 (new RocksDBStateBackend("path", true);
).
我已尝试使用 EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))
的新配置,但出现此错误:
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
从 Java 以编程方式为 flink 1.13 配置 RocksDB 状态后端的最佳方法是什么?
在 Flink 1.13 中,我们重新组织了状态后端,因为旧的方式导致了很多关于事情如何工作的误解。所以这两个问题是分离的:
- 你的工作状态存储在哪里(状态后端)。 (对于 RocksDB,应该配置为使用最快的可用本地磁盘。)
- 检查点的存储位置(检查点存储)。在大多数情况下,这应该是一个分布式文件系统。
对于旧的 API,RocksDB 涉及两个不同文件系统的事实被传递给 RocksDBStateBackend
构造函数的检查点路径掩盖了。因此,该配置已移至其他位置(见下文)。
这个 table 显示了旧状态后端和新状态后端之间的关系(结合检查点存储):
Legacy State Backend
New State Backend + Checkpoint Storage
MemoryStateBackend
HashMapStateBackend + JobManagerCheckpointStorage
FsStateBackend
HashMapStateBackend + FileSystemCheckpointStorage
RocksDBStateBackend
EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage
在您的情况下,您希望将 EmbeddedRocksDBStateBackend
与 FileSystemCheckpointStorage
一起使用。您当前遇到的问题是您在 RocksDB 中使用内存检查点存储(JobManagerCheckpointStorage
),这严重限制了可以检查点的状态。
您可以通过在flink-conf.yaml
中指定一个检查点目录来解决这个问题
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
或在您的代码中
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
有关完整的详细信息,请参阅 Migrating from Legacy Backends 上的文档。
我在 Flink 1.13 版本中阅读了 EmbeddedRocksDBStateBackend
但是有大小限制,所以我想保留我以前的 Flink 1.11 版本的当前配置,但重点是这种配置 RocksDB 的方式是弃用 (new RocksDBStateBackend("path", true);
).
我已尝试使用 EmbeddedRocksDBStateBackend (new EmbeddedRocksDBStateBackend(true))
的新配置,但出现此错误:
java.util.concurrent.ExecutionException: java.io.IOException: Size of the state is larger than the maximum permitted memory-backed state. Size=9126648 , maxSize=5242880 . Consider using a different state backend, like the File System State backend.
从 Java 以编程方式为 flink 1.13 配置 RocksDB 状态后端的最佳方法是什么?
在 Flink 1.13 中,我们重新组织了状态后端,因为旧的方式导致了很多关于事情如何工作的误解。所以这两个问题是分离的:
- 你的工作状态存储在哪里(状态后端)。 (对于 RocksDB,应该配置为使用最快的可用本地磁盘。)
- 检查点的存储位置(检查点存储)。在大多数情况下,这应该是一个分布式文件系统。
对于旧的 API,RocksDB 涉及两个不同文件系统的事实被传递给 RocksDBStateBackend
构造函数的检查点路径掩盖了。因此,该配置已移至其他位置(见下文)。
这个 table 显示了旧状态后端和新状态后端之间的关系(结合检查点存储):
Legacy State Backend | New State Backend + Checkpoint Storage |
---|---|
MemoryStateBackend |
HashMapStateBackend + JobManagerCheckpointStorage |
FsStateBackend |
HashMapStateBackend + FileSystemCheckpointStorage |
RocksDBStateBackend |
EmbeddedRocksDBStateBackend + FileSystemCheckpointStorage |
在您的情况下,您希望将 EmbeddedRocksDBStateBackend
与 FileSystemCheckpointStorage
一起使用。您当前遇到的问题是您在 RocksDB 中使用内存检查点存储(JobManagerCheckpointStorage
),这严重限制了可以检查点的状态。
您可以通过在flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: file:///checkpoint-dir/
# Optional, Flink will automatically default to FileSystemCheckpointStorage
# when a checkpoint directory is specified.
state.checkpoint-storage: filesystem
或在您的代码中
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
有关完整的详细信息,请参阅 Migrating from Legacy Backends 上的文档。