Flink State Schema 迁移
Flink State Schema Migration
我在使用 MemoryStateBackend 的独立集群上有一个 flink 流应用程序。 Kryo 的 TaggedFieldSerializer 被用作默认序列化器。
当我更改状态模式并重新部署应用程序时,出现以下异常
Caused by: org.apache.flink.util.StateMigrationException: State migration isn't supported, yet.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:209)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:142)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createValueState(HeapKeyedStateBackend.java:234)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.createValueState(AbstractKeyedStateBackend.java:315)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:312)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:392)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
如果有人建议我解决这个问题或者我应该使用 FsStateBackend 来解决这个问题,那将非常有帮助。
P.S。如果我想在独立集群上为我的 flink 应用程序 运行 使用 S3 上的 FsStateBackend,必须进行哪些配置更改。
我在使用 MemoryStateBackend 的独立集群上有一个 flink 流应用程序。 Kryo 的 TaggedFieldSerializer 被用作默认序列化器。
当我更改状态模式并重新部署应用程序时,出现以下异常
Caused by: org.apache.flink.util.StateMigrationException: State migration isn't supported, yet.
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:209)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:142)
at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createValueState(HeapKeyedStateBackend.java:234)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.createValueState(AbstractKeyedStateBackend.java:315)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:128)
at org.apache.flink.api.common.state.ValueStateDescriptor.bind(ValueStateDescriptor.java:35)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:312)
at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:392)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
如果有人建议我解决这个问题或者我应该使用 FsStateBackend 来解决这个问题,那将非常有帮助。
P.S。如果我想在独立集群上为我的 flink 应用程序 运行 使用 S3 上的 FsStateBackend,必须进行哪些配置更改。