在处理 flink 应用程序中的流运算符期间加入 ioThread 时中断/错误

Interrupted while joining ioThread / Error during disposal of stream operator in flink application

我有一个基于 flink 的流应用程序,它使用 apache kafka 源和接收器。因为有些日子我在开发过程中随机出现异常,我不知道它们来自哪里。

我是 运行 IntelliJ 中的应用程序,使用 mainRunner class,我通过 kafka 向它提供消息。有时第一条消息会触发错误,有时它只会在几条消息之后发生。

这是它的样子:

16:31:01.935 ERROR o.a.k.c.producer.KafkaProducer      - Interrupted while joining ioThread
java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
    at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) [kafka-clients-0.11.0.2.jar:na]
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) [flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:662) [flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) [flink-core-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
16:31:01.936 ERROR o.a.f.s.runtime.tasks.StreamTask    - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1062) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1010) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:989) ~[kafka-clients-0.11.0.2.jar:na]
    at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer.close(FlinkKafkaProducer.java:168) ~[flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.close(FlinkKafkaProducer011.java:662) ~[flink-connector-kafka-0.11_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43) ~[flink-core-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:117) ~[flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:477) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:378) [flink-streaming-java_2.11-1.6.1.jar:1.6.1]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) [flink-runtime_2.11-1.6.1.jar:1.6.1]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_51]
Caused by: java.lang.InterruptedException: null
    at java.lang.Object.wait(Native Method) ~[na:1.8.0_51]
    at java.lang.Thread.join(Thread.java:1253) [na:1.8.0_51]
    at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:1031) ~[kafka-clients-0.11.0.2.jar:na]
    ... 10 common frames omitted
16:31:01.938 ERROR o.a.k.c.producer.KafkaProducer      - Interrupted while joining ioThread

我得到了大约 10-20 个,然后似乎 flink 恢复了应用程序,它再次可用,我可以成功处理消息。

这可能是什么原因造成的?或者我如何进一步分析以追踪到这一点?

我在 mac 上使用带有 scala 2.11 的 flink 版本 1.6.1 和 IntelliJ beeing 版本 2018.3.2。

我能够解决它。结果是我的一个流运算符(映射函数)由于一些无效的数组索引而抛出异常。

我不可能在日志中看到这个,只有当我一步一步地将应用程序分解成更小的部分时,我终于在日志中看到了这个异常,并且在修复了数组中的明显错误之后访问,上述异常(java.lang.InterruptedExceptionorg.apache.kafka.common.KafkaException)消失了。