Flink Scala Job Runtime Error : java.lang.NoSuchMethodError

Flink Scala Job Runtime Error : java.lang.NoSuchMethodError

我正在尝试 运行 AWS EMR 上的一个简单的 flink 流作业。目前的目的很简单:

  1. 在flink中消费来自Kafka的数据
  2. 加载到kafka中的另一个主题。

我正在使用以下依赖项:

scalaVersion := "2.11.8"
val flinkVersion = "1.11.1"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % flinkVersion,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion
)

我使用的 Flink 代码是:

private val serdeSchema = new SimpleStringSchema

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(createKafkaConsumer(kafkaInputTopic
       , kafkaBrokers, kafkaConfig("consumerGroupId").toString
       , kafkaConfig("defaultReset").toString))
  
    stream
     .map((s: String) => s)
     .addSink(createKafkaProducer(kafkaOutputTopic, kafkaBrokers))
  
    env.execute(jobConfig("jobName").toString)
    
  }
  
  def createKafkaProducer(kafkaTopic: String, kafkaBrokers: String): FlinkKafkaProducer[String] = {
    
    val producer = new FlinkKafkaProducer[String](kafkaBrokers,
      kafkaTopic, serdeSchema)
    producer
  }
  
  def createKafkaConsumer(kafkaInputTopic: String
                          , kafkaBrokers: String
                          , consumerGroup:String
                          , defaultReset: String): FlinkKafkaConsumer[String] = {
  
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", kafkaBrokers)
    properties.setProperty("group.id", consumerGroup)
    properties.setProperty("enable.auto.commit" , "false")
    properties.setProperty("auto.offset.reset" , defaultReset)
    
    val consumer = new FlinkKafkaConsumer[String](kafkaInputTopic, serdeSchema, properties)
    consumer
  }

我使用 sbt 生成了一个汇编 jar。我使用以下命令 运行 EMR

上的作业
/bin/flink run -c com.example.FlinkConsumer flink/target/scala-2.11/flink-assembly-0.1.jar

下面是堆栈跟踪

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c458520153e875811c46c386b9ec605e)
    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null(ClusterClientJobClientAdapter.java:112)
    at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
    at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.client.program.rest.RestClusterClient.lambda$pollResourceAsync(RestClusterClient.java:565)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
    at org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay(FutureUtils.java:291)
    at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
    at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
    at java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:575)
    at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:943)
    at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
    at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null(ClusterClientJobClientAdapter.java:110)
    ... 19 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NoSuchMethodError: org.apache.flink.api.common.serialization.SerializationSchema.open(Lorg/apache/flink/api/common/serialization/SerializationSchema$InitializationContext;)V
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaSerializationSchemaWrapper.open(KafkaSerializationSchemaWrapper.java:61)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.open(FlinkKafkaProducer.java:808)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke[=13=](StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)

看起来像是版本问题,但我尝试了各种版本,我没有看到这个打开方法,但我认为在序列化中它调用了打开方法并且无法找到一个。有人可以帮忙吗?我是flink新手

如果您正在使用 EMR 的 Flink 支持,那么大多数 Flink 库都应标记为“已提供”,这样它们就不会在您的 jar 中,因为它们位于 EMR 提供的 Flink 安装的类路径中.您仍然需要明确包含 EMR 未提供的任何内容(例如 flink-connector-kafka)。