Apache Flink 抛出 "Partition already finished" 个异常
Apache Flink throws "Partition already finished" exceptions
我们是 运行 Kubernetes 中的 Apache Flink 1.9。我们有一些作业会消耗 Kafka 事件并每分钟收集一次计数。工作一直很好,最近突然出现很多错误
java.lang.RuntimeException: Partition already finished.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
引发错误的代码来自获取事件并发出水印的侦听器。
// We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
import org.apache.flink.streaming.api.functions.source.SourceFunction
protected var context: SourceFunction.SourceContext[T] = ...
validEventsSorted.foreach { event =>
try {
context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
} catch {
case e: Throwable =>
logger.error(
s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
s" Event: $event",
e
)
}
}
重启Flink job manager和task manager即可结束错误,但稍后可能会再次出现此问题。
据我了解和猜测,Partition already finished
是在操作员试图将事件传递给下一个操作员(分区)时引起的,但我不明白这是怎么发生的。
这是我们在 Source 上的代码
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
val executorService = Executors.newSingleThreadExecutor()
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
有人知道为什么以及如何解决这个问题吗?非常感谢!
原来我们的 SourceImpl
有一个错误。当这个作业被JobManager取消时,cancel
方法被调用但可能会失败并且executorService
没有关闭并且TaskManager中的runnable
仍然是运行ning,它消耗事件和发出水印。由于作业已在 JobManager 和 TaskManager 中标记为已取消,水印发射将导致 Partition already finished
异常。
因此,我们明确地修复了关机 ExecutoreService
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
完整代码如下
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
val executorService = Executors.newSingleThreadExecutor()
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
顺便说一句,我们之所以有一个新的 ExecutorService
是为了 运行 单独线程池中的侦听器,这不会影响 Flink 线程池。但是,如果您认为这不是正确的方法,请在此处发表评论。谢谢!
我们是 运行 Kubernetes 中的 Apache Flink 1.9。我们有一些作业会消耗 Kafka 事件并每分钟收集一次计数。工作一直很好,最近突然出现很多错误
java.lang.RuntimeException: Partition already finished.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
引发错误的代码来自获取事件并发出水印的侦听器。
// We use an underlying API lib to get a source Context from Flink, sorry not to have source code here
import org.apache.flink.streaming.api.functions.source.SourceFunction
protected var context: SourceFunction.SourceContext[T] = ...
validEventsSorted.foreach { event =>
try {
context.collectWithTimestamp(event, event.occurredAt.toEpochMilli)
context.emitWatermark(new Watermark(event.occurredAt.minusSeconds(30).toEpochMilli))
} catch {
case e: Throwable =>
logger.error(
s"Failed to add to context. Event EID: ${event.nakadiMetaData.eid}." +
s" Event: $event",
e
)
}
}
重启Flink job manager和task manager即可结束错误,但稍后可能会再次出现此问题。
据我了解和猜测,Partition already finished
是在操作员试图将事件传递给下一个操作员(分区)时引起的,但我不明白这是怎么发生的。
这是我们在 Source 上的代码
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
val executorService = Executors.newSingleThreadExecutor()
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
有人知道为什么以及如何解决这个问题吗?非常感谢!
原来我们的 SourceImpl
有一个错误。当这个作业被JobManager取消时,cancel
方法被调用但可能会失败并且executorService
没有关闭并且TaskManager中的runnable
仍然是运行ning,它消耗事件和发出水印。由于作业已在 JobManager 和 TaskManager 中标记为已取消,水印发射将导致 Partition already finished
异常。
因此,我们明确地修复了关机 ExecutoreService
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
完整代码如下
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
class SourceImpl[T: ClassTag](
listener: KafkaListener[T]
)
extends RichParallelSourceFunction[T] {
@volatile private var isCancelled: Boolean = false
@volatile private var consumerFuture: java.util.concurrent.Future[_] = _
override def run(ctx: SourceFunction.SourceContext[T]): Unit = {
val executorService = Executors.newSingleThreadExecutor()
while (!isCancelled) {
val runnable = KafkaClient
.stream(subscription)
.withStreamParameters(streamParameters)
.runnable(classTag[T].runtimeClass.asInstanceOf[Class[T]], listener)
consumerFuture = executorService.submit(runnable)
consumerFuture.get() // This is blocking
} catch {
case e: Throwable =>
logger.warn(s"Unknown error consuming events", e)
}
}
// Shutdown executorService
if (executorService != null && !executorService.isShutdown) {
executorService.shutdownNow()
}
}
override def cancel(): Unit = {
isCancelled = true
consumerFuture.cancel(true)
}
}
顺便说一句,我们之所以有一个新的 ExecutorService
是为了 运行 单独线程池中的侦听器,这不会影响 Flink 线程池。但是,如果您认为这不是正确的方法,请在此处发表评论。谢谢!