Flink 1.7.2 Stateful-processing with KeyedProcessFunction 和 RocksDB state-backend 接收异步异常
Receiving Asynchronous Exception in Flink 1.7.2 Stateful-processing with KeyedProcessFunction and RocksDB state-backend
我使用 Flink 1.7.2 和 Kafka 2.2 作为消费者和生产者编写了一个简单的字数统计应用程序。我对 Kafka 生产者使用 Exactly-Once 语义,KeyedProcessFunction
用于有状态处理,MapState
保持我的状态,RocksDB
使用增量检查点作为我的状态后端。
当我从 IntelliJ 运行 应用程序工作得很好,但是当我将它提交到我的本地 Flink 集群时,我收到 AsynchronousException
异常并且 Flink 应用程序在每 0-20 次后不断重试秒。有没有人遇到过这个问题?从配置的角度来看,我是否遗漏了什么?
这是我的代码:
class KeyedProcFuncWordCount extends KeyedProcessFunction[String, String, (String, Int)]
{
private var state: MapState[String, Int] = _
override def open(parameters: Configuration): Unit =
{
state = getRuntimeContext
.getMapState(new MapStateDescriptor[String, Int]("wordCountState", createTypeInformation[String],
createTypeInformation[Int]))
}
override def processElement(value: String,
ctx: KeyedProcessFunction[String, String, (String, Int)]#Context,
out: Collector[(String, Int)]): Unit =
{
val currentSum =
if (state.contains(value)) state.get(value)
else 0
val newSum = currentSum + 1
state.put(value, newSum)
out.collect((value, newSum))
}
}
object KafkaProcFuncWordCount
{
val bootstrapServers = "localhost:9092"
val inTopic = "test"
val outTopic = "test-out"
def main(args: Array[String]): Unit =
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30000)
env.setStateBackend(new RocksDBStateBackend("file:///tmp/data/db.rdb", true))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val consumerProps = new Properties
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProcFuncWordCount")
consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
val kafkaConsumer = new FlinkKafkaConsumer011[String](inTopic, new SimpleStringSchema, consumerProps)
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val kafkaProducer = new FlinkKafkaProducer011[String](
outTopic,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
producerProps,
Optional.of(new FlinkFixedPartitioner[String]),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
5
)
val text = env.addSource(kafkaConsumer)
val runningCounts = text
.keyBy(_.toString)
.process(new KeyedProcFuncWordCount)
.map(_.toString())
runningCounts
.addSink(kafkaProducer)
env.execute("KafkaProcFuncWordCount")
}
}
这是不断重复的 flink taskexecutor 日志的一部分:
2019-07-05 14:05:47,548 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer - Flushing new partitions
2019-07-05 14:05:47,552 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (1/1) to produce into default topic test-out
2019-07-05 14:05:47,775 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe).
2019-07-05 14:05:47,776 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer$TransactionStateSerializerSnapshot.<init>(FlinkKafkaProducer011.java:1244)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer.snapshotConfiguration(FlinkKafkaProducer011.java:1235)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.<init>(TwoPhaseCommitSinkFunction.java:847)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:792)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:615)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.callInternal(DefaultOperatorStateBackend.java:711)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.callInternal(DefaultOperatorStateBackend.java:696)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
非常感谢您的帮助。
你能double-check你没有在你的 jar 中打包 Flink 核心依赖项(flink-java、flink-streaming-java、flink-runtime、...)吗?另外 double-check 您的 运行 集群中的 Flink 版本与 Kafka 连接器的依赖项 (flink-kafka-connector) 相同。 flink-kakfa-connector(就像所有连接器都需要成为您的 fatjar 的一部分)。
希望这对您有所帮助。
干杯,
康斯坦丁
我使用 Flink 1.7.2 和 Kafka 2.2 作为消费者和生产者编写了一个简单的字数统计应用程序。我对 Kafka 生产者使用 Exactly-Once 语义,KeyedProcessFunction
用于有状态处理,MapState
保持我的状态,RocksDB
使用增量检查点作为我的状态后端。
当我从 IntelliJ 运行 应用程序工作得很好,但是当我将它提交到我的本地 Flink 集群时,我收到 AsynchronousException
异常并且 Flink 应用程序在每 0-20 次后不断重试秒。有没有人遇到过这个问题?从配置的角度来看,我是否遗漏了什么?
这是我的代码:
class KeyedProcFuncWordCount extends KeyedProcessFunction[String, String, (String, Int)]
{
private var state: MapState[String, Int] = _
override def open(parameters: Configuration): Unit =
{
state = getRuntimeContext
.getMapState(new MapStateDescriptor[String, Int]("wordCountState", createTypeInformation[String],
createTypeInformation[Int]))
}
override def processElement(value: String,
ctx: KeyedProcessFunction[String, String, (String, Int)]#Context,
out: Collector[(String, Int)]): Unit =
{
val currentSum =
if (state.contains(value)) state.get(value)
else 0
val newSum = currentSum + 1
state.put(value, newSum)
out.collect((value, newSum))
}
}
object KafkaProcFuncWordCount
{
val bootstrapServers = "localhost:9092"
val inTopic = "test"
val outTopic = "test-out"
def main(args: Array[String]): Unit =
{
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(30000)
env.setStateBackend(new RocksDBStateBackend("file:///tmp/data/db.rdb", true))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val consumerProps = new Properties
consumerProps.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "KafkaProcFuncWordCount")
consumerProps.setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
val kafkaConsumer = new FlinkKafkaConsumer011[String](inTopic, new SimpleStringSchema, consumerProps)
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "2147483647")
producerProps.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
producerProps.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
val kafkaProducer = new FlinkKafkaProducer011[String](
outTopic,
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema),
producerProps,
Optional.of(new FlinkFixedPartitioner[String]),
FlinkKafkaProducer011.Semantic.EXACTLY_ONCE,
5
)
val text = env.addSource(kafkaConsumer)
val runningCounts = text
.keyBy(_.toString)
.process(new KeyedProcFuncWordCount)
.map(_.toString())
runningCounts
.addSink(kafkaProducer)
env.execute("KafkaProcFuncWordCount")
}
}
这是不断重复的 flink taskexecutor 日志的一部分:
2019-07-05 14:05:47,548 INFO org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaProducer - Flushing new partitions
2019-07-05 14:05:47,552 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011 - Starting FlinkKafkaProducer (1/1) to produce into default topic test-out
2019-07-05 14:05:47,775 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe).
2019-07-05 14:05:47,776 INFO org.apache.flink.runtime.taskmanager.Task - KeyedProcess -> Map -> Sink: Unnamed (1/1) (f61d24c993f400394eaa028981a26bfe) switched from RUNNING to FAILED.
AsynchronousException{java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).}
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.Exception: Could not materialize checkpoint 6 for operator KeyedProcess -> Map -> Sink: Unnamed (1/1).
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
... 5 more
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot.<init>(Ljava/util/function/Supplier;)V
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer$TransactionStateSerializerSnapshot.<init>(FlinkKafkaProducer011.java:1244)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011$TransactionStateSerializer.snapshotConfiguration(FlinkKafkaProducer011.java:1235)
at org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.<init>(CompositeTypeSerializerConfigSnapshot.java:53)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializerConfigSnapshot.<init>(TwoPhaseCommitSinkFunction.java:847)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:792)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$StateSerializer.snapshotConfiguration(TwoPhaseCommitSinkFunction.java:615)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.callInternal(DefaultOperatorStateBackend.java:711)
at org.apache.flink.runtime.state.DefaultOperatorStateBackend$DefaultOperatorStateBackendSnapshotStrategy.callInternal(DefaultOperatorStateBackend.java:696)
at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
非常感谢您的帮助。
你能double-check你没有在你的 jar 中打包 Flink 核心依赖项(flink-java、flink-streaming-java、flink-runtime、...)吗?另外 double-check 您的 运行 集群中的 Flink 版本与 Kafka 连接器的依赖项 (flink-kafka-connector) 相同。 flink-kakfa-connector(就像所有连接器都需要成为您的 fatjar 的一部分)。
希望这对您有所帮助。
干杯,
康斯坦丁