Spark 和 AWS Kinesis

Spark and AWS Kinesis

我正在尝试在独立模式下使用 Apache Spark 2.4.0 连接到 Kinesis。我使用以下方法提供我的凭据:

val cred =  SparkAWSCredentials.builder.basicCredentials("{awsAccessKeyId}", "{awsSecretAccessKey}").build()

出现这样的错误:

java.lang.NoSuchMethodError: org.apache.spark.internal.Logging.$init$(Lorg/apache/spark/internal/Logging;)V at org.apache.spark.streaming.kinesis.BasicCredentials.(SparkAWSCredentials.scala:51) at org.apache.spark.streaming.kinesis.SparkAWSCredentials$Builder.basicCredentials(SparkAWSCredentials.scala:116) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:34) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:39) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:41) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:43) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:45) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:47) at $line18.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:49) at $line18.$read$$iw$$iw$$iw$$iw$$iw.(:51) at $line18.$read$$iw$$iw$$iw$$iw.(:53) at $line18.$read$$iw$$iw$$iw.(:55) at $line18.$read$$iw$$iw.(:57) at $line18.$read$$iw.(:59) at $line18.$read.(:61) at $line18.$read$.(:65) at $line18.$read$.() at $line18.$eval$.$print$lzycompute(:7) at $line18.$eval$.$print(:6) at $line18.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq.apply(IMain.scala:645) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq.apply(IMain.scala:644) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:819) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:691) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:425) at org.apache.spark.repl.SparkILoop$$anonfun$process.apply$mcZ$sp(SparkILoop.scala:285) at org.apache.spark.repl.SparkILoop.runClosure(SparkILoop.scala:159) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:182) at org.apache.spark.repl.Main$.doMain(Main.scala:78) at org.apache.spark.repl.Main$.main(Main.scala:58) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

当我尝试连接作为环境变量提供的凭据或将其设置为:

sc.hadoopConfiguration.set("fs.s3n.awsAccessKeyId", "{}")
sc.hadoopConfiguration.set("fs.s3n.awsSecretAccessKey","{}")

我收到这样的错误:

java.lang.NoSuchMethodError: scala.Product.$init$(Lscala/Product;)V at org.apache.spark.streaming.kinesis.DefaultCredentials$.(SparkAWSCredentials.scala:39) at org.apache.spark.streaming.kinesis.DefaultCredentials$.(SparkAWSCredentials.scala) at org.apache.spark.streaming.kinesis.KinesisInputDStream$Builder.$anonfun$buildWithMessageHandler(KinesisInputDStream.scala:291) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.streaming.kinesis.KinesisInputDStream$Builder.buildWithMessageHandler(KinesisInputDStream.scala:291) at org.apache.spark.streaming.kinesis.KinesisInputDStream$Builder.build(KinesisInputDStream.scala:302) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:43) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:48) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:50) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:52) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:54) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.(:56) at $line24.$read$$iw$$iw$$iw$$iw$$iw$$iw.(:58) at $line24.$read$$iw$$iw$$iw$$iw$$iw.(:60) at $line24.$read$$iw$$iw$$iw$$iw.(:62) at $line24.$read$$iw$$iw$$iw.(:64) at $line24.$read$$iw$$iw.(:66) at $line24.$read$$iw.(:68) at $line24.$read.(:70) at $line24.$read$.(:74) at $line24.$read$.() at $line24.$eval$.$print$lzycompute(:7) at $line24.$eval$.$print(:6) at $line24.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:793) at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1054) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq.apply(IMain.scala:645) at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq.apply(IMain.scala:644) at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31) at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19) at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:644) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:576) at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:572) at scala.tools.nsc.interpreter.ILoop$$anonfun.apply(ILoop.scala:762) at scala.tools.nsc.interpreter.ILoop$$anonfun.apply(ILoop.scala:762) at scala.tools.nsc.interpreter.IMain.withLabel(IMain.scala:116) at scala.tools.nsc.interpreter.ILoop.interpretCode(ILoop.scala:762) at scala.tools.nsc.interpreter.ILoop.pasteCommand(ILoop.scala:776) at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands.apply(ILoop.scala:217) at scala.tools.nsc.interpreter.ILoop$$anonfun$standardCommands.apply(ILoop.scala:217) at scala.tools.nsc.interpreter.LoopCommands$LineCmd.apply(LoopCommands.scala:62) at scala.tools.nsc.interpreter.ILoop.colonCommand(ILoop.scala:698) at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:689) at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:404) at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:425) at org.apache.spark.repl.SparkILoop$$anonfun$process.apply$mcZ$sp(SparkILoop.scala:285) at org.apache.spark.repl.SparkILoop.runClosure(SparkILoop.scala:159) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:182) at org.apache.spark.repl.Main$.doMain(Main.scala:78) at org.apache.spark.repl.Main$.main(Main.scala:58) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:849) at org.apache.spark.deploy.SparkSubmit.doRunMain(SparkSubmit.scala:167) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:195) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86) at org.apache.spark.deploy.SparkSubmit$$anon.doSubmit(SparkSubmit.scala:924) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:933) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

有人能帮忙吗?

我遇到了同样的错误,在我的例子中,我使用的是 spark-streaming-kinesis-asl_2.12 而不是 spark-streaming-kinesis-asl_2.11.

尝试使用 SparkAWSCredentials credentials = new SparkAWSCredentials.Builder().basicCredentials("***", "****").build();

根据 scala 代码,Builder 是一个 class,可以使用 SparkAWSCredentials 单例访问。