无法使用 Flink 处理器恢复检查点状态 API
Failed to restore checkpointed states using Flink Processor API
主程序正在消费kafka事件,然后filter -> map -> keyBy -> CEP -> sink。我写了另一个单独的简单程序来读取检查点目录,如下所示:
object StateReader extends App {
val path = "file://...."
val env = ExecutionEnvironment.getExecutionEnvironment
val chk = Savepoint.load(env.getJavaEnv, path, new FsStateBackend(path))
val ds = chk.readKeyedState("cep", new CepOperatorReadFunction, TypeInformation.of(classOf[KEY]), TypeInformation.of(classOf[VALUE]))
println(ds.count())
}
class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY, VALUE] {
override def open(parameters: Configuration): Unit = {
}
override def readKey(k: KEY, context: KeyedStateReaderFunction.Context, collector: Collector[VALUE]): Unit = {
}//end readKey
}//end class CepOperatorReadFunction
但是我遇到了以下异常:
Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more
以下是flink-conf.yaml
中的一些配置
state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
知道异常发生的原因以及如何解决问题吗?
谢谢
没有 out-of-the-box 支持使读取 CEP 操作员的状态变得容易。因此,要实施您的 KeyedStateReaderFunction
,您必须深入研究 CEP 实施,找到使用的 ValueState
s 和 MapState
s,并实施使用的 reader那些相同的状态描述符。
主程序正在消费kafka事件,然后filter -> map -> keyBy -> CEP -> sink。我写了另一个单独的简单程序来读取检查点目录,如下所示:
object StateReader extends App {
val path = "file://...."
val env = ExecutionEnvironment.getExecutionEnvironment
val chk = Savepoint.load(env.getJavaEnv, path, new FsStateBackend(path))
val ds = chk.readKeyedState("cep", new CepOperatorReadFunction, TypeInformation.of(classOf[KEY]), TypeInformation.of(classOf[VALUE]))
println(ds.count())
}
class CepOperatorReadFunction extends KeyedStateReaderFunction[KEY, VALUE] {
override def open(parameters: Configuration): Unit = {
}
override def readKey(k: KEY, context: KeyedStateReaderFunction.Context, collector: Collector[VALUE]): Unit = {
}//end readKey
}//end class CepOperatorReadFunction
但是我遇到了以下异常:
Caused by: java.lang.IllegalStateException: Unexpected state handle type, expected: class org.apache.flink.runtime.state.KeyGroupsStateHandle, but found: class org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle
at org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:120)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:114)
... 13 more
以下是flink-conf.yaml
state.backend: rocksdb
state.checkpoints.dir: hdfs:///.../checkpoints
state.savepoints.dir: hdfs:///.../savepoints
state.backend.incremental: true
state.backend.rocksdb.memory.write-buffer-ratio: 0.6
state.backend.rocksdb.localdir: /var/lib/.../rocksdb
execution.checkpointing.interval: 900000
execution.checkpointing.timeout: 600000
execution.checkpointing.unaligned: true
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
知道异常发生的原因以及如何解决问题吗?
谢谢
没有 out-of-the-box 支持使读取 CEP 操作员的状态变得容易。因此,要实施您的 KeyedStateReaderFunction
,您必须深入研究 CEP 实施,找到使用的 ValueState
s 和 MapState
s,并实施使用的 reader那些相同的状态描述符。