Apache Flink 抛出 "Partition already finished" 个异常

Apache Flink throws "Partition already finished" exceptions

我们是 运行 Kubernetes 中的 Apache Flink 1.9。我们有一些作业会消耗 Kafka 事件并每分钟收集一次计数。工作一直很好,最近突然出现很多错误

java.lang.RuntimeException: Partition already finished.
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
    at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)

引发错误的代码来自获取事件并发出水印的侦听器。

    // We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
    import org.apache.flink.streaming.api.functions.source.SourceFunction
    protected var context: SourceFunction.SourceContext[T] = ...

    validEventsSorted.foreach { event =>
      try {
        context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
        context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
      } catch {
        case e: Throwable =>
          logger.error(
              s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
                s" Event: $event",
              e
            )
      }

    }

重启Flink job manager和task manager即可结束错误,但稍后可能会再次出现此问题。

据我了解和猜测,Partition already finished 是在操作员试图将事件传递给下一个操作员(分区)时引起的,但我不明白这是怎么发生的。

这是我们在 Source 上的代码

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction

class SourceImpl[T: ClassTag](
    listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {

  @volatile private var isCancelled: Boolean = false

  @volatile private var consumerFuture: java.util.concurrent.Future[_] = _

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    
    while (!isCancelled) {
        val runnable = KafkaClient
          .stream(subscription)
          .withStreamParameters(streamParameters)
          .runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)

        val executorService = Executors.newSingleThreadExecutor()
        consumerFuture = executorService.submit(runnable)
        consumerFuture.get() // This is blocking
      } catch {
        case e: Throwable =>
          logger.warn(s"Unknown error consuming events", e)
      }
    }
  }

  override def cancel(): Unit = {
    isCancelled = true
    consumerFuture.cancel(true)
  }
}

有人知道为什么以及如何解决这个问题吗?非常感谢!

原来我们的 SourceImpl 有一个错误。当这个作业被JobManager取消时,cancel方法被调用但可能会失败并且executorService没有关闭并且TaskManager中的runnable仍然是运行ning,它消耗事件和发出水印。由于作业已在 JobManager 和 TaskManager 中标记为已取消,水印发射将导致 Partition already finished 异常。

因此,我们明确地修复了关机 ExecutoreService

    // Shutdown executorService
    if (executorService != null && !executorService.isShutdown) {
      executorService.shutdownNow()
    }

完整代码如下

import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction

class SourceImpl[T: ClassTag](
    listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {

  @volatile private var isCancelled: Boolean = false

  @volatile private var consumerFuture: java.util.concurrent.Future[_] = _

  override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
    
    val executorService = Executors.newSingleThreadExecutor()
  
    while (!isCancelled) {
        val runnable = KafkaClient
          .stream(subscription)
          .withStreamParameters(streamParameters)
          .runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)

        consumerFuture = executorService.submit(runnable)
        consumerFuture.get() // This is blocking
      } catch {
        case e: Throwable =>
          logger.warn(s"Unknown error consuming events", e)
      }
    }

    // Shutdown executorService
    if (executorService != null && !executorService.isShutdown) {
      executorService.shutdownNow()
    }
  }

  override def cancel(): Unit = {
    isCancelled = true
    consumerFuture.cancel(true)
  }
}

顺便说一句,我们之所以有一个新的 ExecutorService 是为了 运行 单独线程池中的侦听器,这不会影响 Flink 线程池。但是,如果您认为这不是正确的方法,请在此处发表评论。谢谢!