Flink 状态处理器 API 与 EmbeddedRocksDBStateBackend

Flink State Processor API with EmbeddedRocksDBStateBackend

我正在尝试使用此示例创建一个保存点 https://gist.github.com/alpinegizmo/ff3d2e748287853c88f21259830b29cf

但不是

.create(new FsStateBackend("file:///tmp/checkpoints"), 256)

我需要 RocksDBStateBackend。由于在 1.13.1 中 RocksDBStateBackend 已被弃用,我必须使用 EmbeddedRocksDBStateBackend。但是

.create(new EmbeddedRocksDBStateBackend(), 256)

不起作用。这是错误:

org.apache.flink.util.FlinkException: Application failed unexpectedly.
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAndShutdownClusterAsync[=13=](ApplicationDispatcherBootstrap.java:170)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
        at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
        at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:257
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.lambda$runApplicationAsync(ApplicationDispatcherBootstrap.java:212)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:159)
        at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
        at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    Caused by: java.util.concurrent.CompletionException: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.\n\tat java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
        at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
        at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:957)
        at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        ... 13 more\nCaused by: org.apache.flink.client.deployment.application.ApplicationExecutionException: Could not execute application.
        ... 11 more\nCaused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:242)
        ... 10 more\nCaused by: java.lang.IllegalStateException\n\tat org.apache.flink.util.Preconditions.checkState(Preconditions.java:177)
        tat org.apache.flink.state.api.output.OperatorSubtaskStateReducer.<init>(OperatorSubtaskStateReducer.java:50)
        at org.apache.flink.state.api.BootstrapTransformation.writeOperatorState(BootstrapTransformation.java:146)
        at org.apache.flink.state.api.WritableSavepoint.lambda$writeOperatorStates[=13=](WritableSavepoint.java:134)
        at java.util.stream.ReferencePipeline.accept(ReferencePipeline.java:193)
        at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:546)
        at org.apache.flink.state.api.WritableSavepoint.writeOperatorStates(WritableSavepoint.java:139)
        at org.apache.flink.state.api.WritableSavepoint.write(WritableSavepoint.java:99)
        at CreateSavepointJob.main(CreateSavepointJob.java:157)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        ... 13 more

我做错了什么?

谢谢

我刚刚修改了那个示例以使用 EmbeddedRocksDBStateBackend,它运行良好。

你的项目中需要有这个依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

但是,我怀疑问题出在您未共享的内容上。