Flink 作业在作业重启时抛出 ClassNotFoundException
Flink job throws ClassNotFoundException on job restart
我 运行 Flink on YARN 有两个任务管理器。我写了一个简单的工作来消费来自 Kafka 的消息。该作业在 taskmanager 1 上运行。当我终止 taskmanager 1(通过 kill PID)时,该作业在 taskmanager 2 上重新启动。到目前为止一切顺利。但是在启动消费者之后执行失败:
java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
... 8 more
我用以下方法构建 jar 文件:
mvn clean package -Pbuild-jar
我也试过这个,但没什么区别:
mvn clean package
奇怪的是,我的作业在第一次尝试时运行良好,但在作业重新启动时出现 CNFE。我究竟做错了什么? (我正在使用 Flink 1.2-SNAPSHOT 因为我需要 BucketSink)。我比较了两个任务管理器的类路径,它们是相等的。
完整性:这是一个临时问题,已经修复。
我 运行 Flink on YARN 有两个任务管理器。我写了一个简单的工作来消费来自 Kafka 的消息。该作业在 taskmanager 1 上运行。当我终止 taskmanager 1(通过 kill PID)时,该作业在 taskmanager 2 上重新启动。到目前为止一切顺利。但是在启动消费者之后执行失败:
java.lang.RuntimeException: Could not deserialize NFA. at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:86)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:31)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getPartitionableState(DefaultOperatorStateBackend.java:107)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.initializeState(FlinkKafkaConsumerBase.java:323)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:396)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.api.java.typeutils.runtime.JavaSerializer.deserialize(JavaSerializer.java:83)
... 8 more
我用以下方法构建 jar 文件:
mvn clean package -Pbuild-jar
我也试过这个,但没什么区别:
mvn clean package
奇怪的是,我的作业在第一次尝试时运行良好,但在作业重新启动时出现 CNFE。我究竟做错了什么? (我正在使用 Flink 1.2-SNAPSHOT 因为我需要 BucketSink)。我比较了两个任务管理器的类路径,它们是相等的。
完整性:这是一个临时问题,已经修复。