如何调试 Flink 任务问题

How to debug Flink Task Issues

您好,我正在尝试 运行 一个从 kafka 读取的 flink scala 应用程序应用一些查找转换,然后写入 kafka。

Flink 版本 1.12.1

我在本地测试过,没问题。 但是当我尝试使用原生 kubernetes 集成在集群上 运行 它时,我看到如下奇怪的错误。

集群看起来也不错,因为我尝试 运行 集群上的 wordcount 应用程序并且运行良好。

异常不明确,堆栈跟踪也显示了任务管理器堆栈跟踪,因此不知道应用程序中可能出现问题的位置。这可能是序列化问题吗?有没有办法调试此类问题并找到应用程序代码中存在问题的实际点?

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Could not instantiate serializer.
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createStreamOutput(OperatorChain.java:664) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainOutputs(OperatorChain.java:250) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:160) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) [flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.io.IOException: unexpected exception type
        at java.io.ObjectStreamClass.throwMiscException(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more
Caused by: java.util.concurrent.ExecutionException: java.lang.ClassNotFoundException: __wrapperaa8fcbe22114421a688e120fcde1df7.__wrapperaa8fcbe22114421a688e120fcde1df7$
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.waitForValue(LocalCache.java:3557) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.waitForLoadingValue(LocalCache.java:2302) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2289) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more
Caused by: java.lang.ClassNotFoundException: __wrapperaa8fcbe22114421a688e120fcde1df7.__wrapperaa8fcbe22114421a688e120fcde1df7$
        at scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:64) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?]
        at java.lang.Class.forName0(Native Method) ~[?:?]
        at java.lang.Class.forName(Unknown Source) ~[?:?]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$ToolBoxGlobal.compile(ToolBoxFactory.scala:261) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.$anonfun$compile(ToolBoxFactory.scala:433) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl$withCompilerApi$.apply(ToolBoxFactory.scala:359) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at scala.tools.reflect.ToolBoxFactory$ToolBoxImpl.compile(ToolBoxFactory.scala:426) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.compileCbfInternal(TraversableSerializer.scala:230) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:220) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$LazyRuntimeCompiler.call(TraversableSerializer.scala:216) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.load(LocalCache.java:4742) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.api.scala.typeutils.TraversableSerializer.readObject(TraversableSerializer.scala:72) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at jdk.internal.reflect.GeneratedMethodAccessor77.invoke(Unknown Source) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) ~[?:?]
        at java.lang.reflect.Method.invoke(Unknown Source) ~[?:?]
        at java.io.ObjectStreamClass.invokeReadObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readArray(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.defaultReadFields(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readSerialData(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject0(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at java.io.ObjectInputStream.readObject(Unknown Source) ~[?:?]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        at org.apache.flink.streaming.api.graph.StreamConfig.getTypeSerializerOut(StreamConfig.java:214) ~[flink-dist_2.12-1.12-SNAPSHOT.jar:1.12-SNAPSHOT]
        ... 8 more```

我猜问题出在类加载的顺序上。

我试图在应用程序模式下部署。在 /opt/flink/usrlib 中有我的用户代码 jar 和依赖库之前。 我试着按照这个指南 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/resource-providers/native_kubernetes/#application-mode 并使用 /usrlib 将其放入罐子中。我看到上面的错误。

现在我更改为 /opt/flink/lib 并且用户代码和 flink 库的依赖项位于 docker 映像内的相同目录中。

已更改 COPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/usrlib/poc-flink.jarCOPY .dist/poc-flink-0.0.1.jar $FLINK_HOME/lib/poc-flink.jar 所以使用相同的类加载器。 它解决了这个问题。