Flink 1.7.2 Stateful-processing with KeyedProcessFunction 和 RocksDB state-backend 接收异步异常

Receiving Asynchronous Exception in Flink 1.7.2 Stateful-processing with KeyedProcessFunction and RocksDB state-backend

我使用 Flink 1.7.2 和 Kafka 2.2 作为消费者和生产者编写了一个简单的字数统计应用程序。我对 Kafka 生产者使用 Exactly-Once 语义,KeyedProcessFunction 用于有状态处理,MapState 保持我的状态,RocksDB 使用增量检查点作为我的状态后端。

当我从 IntelliJ 运行 应用程序工作得很好,但是当我将它提交到我的本地 Flink 集群时,我收到 AsynchronousException 异常并且 Flink 应用程序在每 0-20 次后不断重试秒。有没有人遇到过这个问题?从配置的角度来看,我是否遗漏了什么?

这是我的代码:

class KeyedProcFuncWordCount extends KeyedProcessFunction[String, String, (String, Int)]
{
  private var state: MapState[String, Int] = _

  override def open(parameters: Configuration): Unit =
  {
    state = getRuntimeContext
      .getMapState(new MapStateDescriptor[String, Int]("wordCountState", createTypeInformation[String],
        createTypeInformation[Int]))
  }

  override def processElement(value: String,
                              ctx: KeyedProcessFunction[String, String, (String, Int)]#Context,
                              out: Collector[(String, Int)]): Unit =
  {
    val currentSum =
      if (state.contains(value)) state.get(value)
      else 0

    val newSum = currentSum + 1

    state.put(value, newSum)

    out.collect((value, newSum))
  }
}

object KafkaProcFuncWordCount
{
  val bootstrapServers = "localhost:9092"
  val inTopic = "test"
  val outTopic = "test-out"

  def main(args: Array[String]): Unit =
  {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(30000)
    env.setStateBackend(new RocksDBStateBackend("file:///tmp/data/db.rdb", true))

    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val consumerProps = new Properties
    consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProcFuncWordCount")
    consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")

    val kafkaConsumer = new FlinkKafkaConsumer011[String](inTopic, new SimpleStringSchema, consumerProps)

    val producerProps = new Properties
    producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
    producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
    producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
    producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
    producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")

    val kafkaProducer = new FlinkKafkaProducer011[String](
      outTopic,
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
      producerProps,
      Optional.of(new FlinkFixedPartitioner[String]),
      FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
      5
    )

    val text = env.addSource(kafkaConsumer)

    val runningCounts = text
      .keyBy(_.toString)
      .process(new KeyedProcFuncWordCount)
      .map(_.toString())

    runningCounts
      .addSink(kafkaProducer)

    env.execute("KafkaProcFuncWordCount")
  }
}

这是不断重复的 flink taskexecutor 日志的一部分:

2019-07-05 14:05:47,548 INFO  org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer  - Flushing new partitions
2019-07-05 14:05:47,552 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011  - Starting FlinkKafkaProducer (1/1) to produce into default topic test-out
2019-07-05 14:05:47,775 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to fail task externally KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe).
2019-07-05 14:05:47,776 INFO  org.apache.flink.runtime.taskmanager.Task                     - KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).}
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
    ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
    at java.util.concurrent.FutureTask.report(FutureTask.java:122)
    at java.util.concurrent.FutureTask.get(FutureTask.java:192)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
    at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
    at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
    ... 5 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer$TransactionStateSerializerSnapshot.<init>(FlinkKafkaProducer011.java:1244)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer.snapshotConfiguration(FlinkKafkaProducer011.java:1235)
    at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.<init>(TwoPhaseCommitSinkFunction.java:847)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:792)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:615)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
    at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.callInternal(DefaultOperatorStateBackend.java:711)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.callInternal(DefaultOperatorStateBackend.java:696)
    at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
    ... 7 more

非常感谢您的帮助。

你能double-check你没有在你的 jar 中打包 Flink 核心依赖项(flink-java、flink-streaming-java、flink-runtime、...)吗?另外 double-check 您的 运行 集群中的 Flink 版本与 Kafka 连接器的依赖项 (flink-kafka-connector) 相同。 flink-kakfa-connector(就像所有连接器都需要成为您的 fatjar 的一部分)。

希望这对您有所帮助。

干杯,

康斯坦丁