无法实例化提供程序 org.apache.spark.sql.avro.AvroFileFormat
Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
无法从 Spark 流应用程序向 Kafka 主题发送 avro 格式消息。关于 avro spark streaming 示例代码的在线信息非常少。 "to_avro" 方法不需要 avro 模式,那么它将如何编码为 avro 格式?
有人可以帮忙解决以下异常吗?
依赖关系:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
下面是推送到kafka主题的代码
dataset.toDF.select(to_avro(struct(dataset.toDF.columns.map(column):_*))).alias("value").distinct.write.format("avro")
.option(KafkaConstants.BOOTSTRAP_SERVER, priBootStrapServers)
.option(ApplicationConstants.TOPIC_KEY, publishPriTopic)
.save()
低于异常。
Caused by: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access0(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:614)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at com.walmart.replenishment.edf.dao.EdfOwBuzzerDao$.saveToCassandra(EdfOwBuzzerDao.scala:47)
at com.walmart.replenishment.edf.process.BuzzerService$.updateScrItemPriStatus(BuzzerService.scala:119)
at com.walmart.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream.apply(BuzzerStreamProcessor.scala:36)
at com.walmart.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream.apply(BuzzerStreamProcessor.scala:28)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
```
spark-avro_2.12的scala版本应该和spark-core版本一致
您可以使用spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.4 ...
,或spark-submit --jars "spark-avro_2.11-2.4.4.jar"
。
总之,当你使用databricks avro时,你也应该使用apache avro jars。
引用“https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying”
看看这个this ticket。问题似乎存在于 2.4.4 和 2.4.5 中。我仍在使用 2.4.4 版。切换到包 org.apache.spark:spark-avro_2.11:2.4.4
解决了我的问题。
我必须将 2.12:2.4.5
(org.apache.spark:spark-avro_2.12:2.4.5
) 用于我的 dataproc 集群 1.5
image (spark version: 2.4).
没有其他版本 (2.11:2.4.5
/ 2.11:2.4.4
) 有效。
无法从 Spark 流应用程序向 Kafka 主题发送 avro 格式消息。关于 avro spark streaming 示例代码的在线信息非常少。 "to_avro" 方法不需要 avro 模式,那么它将如何编码为 avro 格式?
有人可以帮忙解决以下异常吗?
依赖关系:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.12</artifactId>
<version>2.4.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.4.0</version>
</dependency>
下面是推送到kafka主题的代码
dataset.toDF.select(to_avro(struct(dataset.toDF.columns.map(column):_*))).alias("value").distinct.write.format("avro")
.option(KafkaConstants.BOOTSTRAP_SERVER, priBootStrapServers)
.option(ApplicationConstants.TOPIC_KEY, publishPriTopic)
.save()
低于异常。
Caused by: java.util.ServiceConfigurationError: org.apache.spark.sql.sources.DataSourceRegister: Provider org.apache.spark.sql.avro.AvroFileFormat could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access0(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader.next(ServiceLoader.java:480)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:614)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:241)
at com.walmart.replenishment.edf.dao.EdfOwBuzzerDao$.saveToCassandra(EdfOwBuzzerDao.scala:47)
at com.walmart.replenishment.edf.process.BuzzerService$.updateScrItemPriStatus(BuzzerService.scala:119)
at com.walmart.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream.apply(BuzzerStreamProcessor.scala:36)
at com.walmart.replenishment.edf.process.BuzzerStreamProcessor$$anonfun$processConsumerInputStream.apply(BuzzerStreamProcessor.scala:28)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$$anonfun$apply$mcV$sp.apply(DStream.scala:628)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$$anonfun$apply$mcV$sp.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply$mcV$sp(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:257)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run.apply(JobScheduler.scala:257)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:256)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileFormat.$init$(Lorg/apache/spark/sql/execution/datasources/FileFormat;)V
at org.apache.spark.sql.avro.AvroFileFormat.(AvroFileFormat.scala:44)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
```
spark-avro_2.12的scala版本应该和spark-core版本一致
您可以使用spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.4 ...
,或spark-submit --jars "spark-avro_2.11-2.4.4.jar"
。
总之,当你使用databricks avro时,你也应该使用apache avro jars。
引用“https://spark.apache.org/docs/latest/sql-data-sources-avro.html#deploying”
看看这个this ticket。问题似乎存在于 2.4.4 和 2.4.5 中。我仍在使用 2.4.4 版。切换到包 org.apache.spark:spark-avro_2.11:2.4.4
解决了我的问题。
我必须将 2.12:2.4.5
(org.apache.spark:spark-avro_2.12:2.4.5
) 用于我的 dataproc 集群 1.5
image (spark version: 2.4).
没有其他版本 (2.11:2.4.5
/ 2.11:2.4.4
) 有效。