Apache Beam / Flink 异常InChainedStubException
Apache Beam / Flink ExceptionInChainedStubException
我使用的是Apache Beam 2.0.0 和相同版本的FlinkRunner (scala 2.10)。我正在针对具有 FlinkRunner 依赖项的进程内 Flink master(默认配置)进行测试,显然在运行时引入了 Flink 1.2.1(查看 MVN 依赖项树)。
当存在 "User exceptions" 时,找出实际问题的最佳方法是什么?这不是关于我这次做错了什么的问题;而是如何告诉——一般来说——如何从 Beam 或 Flink 中获取更多信息。这是堆栈跟踪:
Exception in thread "main" java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:122)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at com.mapfit.flow.data.environment.MFEnvironment.run(MFEnvironment.java:70)
at com.mapfit.flow.main.Scratch.main(Scratch.java:35)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$$anonfun$applyOrElse.apply$mcV$sp(JobManager.scala:910)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$$anonfun$applyOrElse.apply(JobManager.scala:853)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$$anonfun$applyOrElse.apply(JobManager.scala:853)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.transforms.MapElements$auxiliary$PCieS8xh.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
at org.apache.beam.sdk.transforms.MapElements.processElement(MapElements.java:122)
请注意完全没有与我编写的代码相关的任何内容(除了我对 pipeline.run() 的调用)。
我下载了每个链接的 jar 的源代码,然后我进入了 ChainedFlatMapDriver
,它在第 82 行引发了异常,最终我看到了 [=] 中的调用生成的 EOFException 28=] 对象序列化(我的值使用默认编码器)。我以为我想做点什么,但 EOFException 的原因似乎在 SimpleCollectingOutputView
行 79,它被抛出很多,并且经常被吞噬,因为这似乎是 Flink 的例行执行。
有知道如何让Flink公开失败信息的大神指点一下吗?
调试后发现更多信息:
Just found more info after walking through more Flink code in the debugger: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:46)
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:30)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
at org.apache.beam.sdk.transforms.MapElements.processElement(MapElements.java:122)
at org.apache.beam.sdk.transforms.MapElements$auxiliary$vuuNRtio.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
看看这两个链接:
https://issues.apache.org/jira/browse/BEAM-2831
我曾经在 运行 对 yarn 上的 flinkrunner 进行 beam 时看到类似的异常。问题页面中提议的编码器有帮助。
除此之外,我建议广泛使用记录器,直到您的管道顺利运行。在 yarn 日志中可以使用 yarn logs 命令检索。不知道你的情况(进程中的 Flink master),但你应该能够写一些我假设的日志......
我使用的是Apache Beam 2.0.0 和相同版本的FlinkRunner (scala 2.10)。我正在针对具有 FlinkRunner 依赖项的进程内 Flink master(默认配置)进行测试,显然在运行时引入了 Flink 1.2.1(查看 MVN 依赖项树)。
当存在 "User exceptions" 时,找出实际问题的最佳方法是什么?这不是关于我这次做错了什么的问题;而是如何告诉——一般来说——如何从 Beam 或 Flink 中获取更多信息。这是堆栈跟踪:
Exception in thread "main" java.lang.RuntimeException: Pipeline execution failed
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:122)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:295)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:281)
at com.mapfit.flow.data.environment.MFEnvironment.run(MFEnvironment.java:70)
at com.mapfit.flow.main.Scratch.main(Scratch.java:35)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$$anonfun$applyOrElse.apply$mcV$sp(JobManager.scala:910)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$$anonfun$applyOrElse.apply(JobManager.scala:853)
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$$anonfun$applyOrElse.apply(JobManager.scala:853)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.beam.sdk.util.UserCodeException: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
at org.apache.beam.sdk.transforms.MapElements$auxiliary$PCieS8xh.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:82)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
at org.apache.beam.sdk.transforms.MapElements.processElement(MapElements.java:122)
请注意完全没有与我编写的代码相关的任何内容(除了我对 pipeline.run() 的调用)。
我下载了每个链接的 jar 的源代码,然后我进入了 ChainedFlatMapDriver
,它在第 82 行引发了异常,最终我看到了 [=] 中的调用生成的 EOFException 28=] 对象序列化(我的值使用默认编码器)。我以为我想做点什么,但 EOFException 的原因似乎在 SimpleCollectingOutputView
行 79,它被抛出很多,并且经常被吞噬,因为这似乎是 Flink 的例行执行。
有知道如何让Flink公开失败信息的大神指点一下吗?
调试后发现更多信息:
Just found more info after walking through more Flink code in the debugger: java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:46)
at org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction.flatMap(FlinkMultiOutputPruningFunction.java:30)
at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction$MultiDoFnOutputManager.output(FlinkDoFnFunction.java:165)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnContext.outputWindowedValue(SimpleDoFnRunner.java:355)
at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:629)
at org.apache.beam.sdk.transforms.MapElements.processElement(MapElements.java:122)
at org.apache.beam.sdk.transforms.MapElements$auxiliary$vuuNRtio.invokeProcessElement(Unknown Source)
at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:197)
at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:158)
at org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:65)
at org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:118)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:665)
at java.lang.Thread.run(Thread.java:745)
看看这两个链接:
https://issues.apache.org/jira/browse/BEAM-2831
我曾经在 运行 对 yarn 上的 flinkrunner 进行 beam 时看到类似的异常。问题页面中提议的编码器有帮助。
除此之外,我建议广泛使用记录器,直到您的管道顺利运行。在 yarn 日志中可以使用 yarn logs 命令检索。不知道你的情况(进程中的 Flink master),但你应该能够写一些我假设的日志......