为什么 format("kafka") 会因 "Failed to find data source: kafka." 而失败(即使使用 uber-jar)?
Why does format("kafka") fail with "Failed to find data source: kafka." (even with uber-jar)?
我使用 HDP-2.6.3.0 和 Spark2 包 2.2.0。
我正在尝试使用结构化流 API 编写 Kafka 消费者,但在将作业提交到集群后出现以下错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more
以下 spark-submit
命令:
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode client \
--class com.example.KafkaConsumer \
--executor-cores 2 \
--executor-memory 512m \
--driver-memory 512m \
sample-kafka-consumer-0.0.1-SNAPSHOT.jar
我的java代码:
package com.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class KafkaConsumer {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("kafkaConsumerApp")
.getOrCreate();
Dataset<Row> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
.option("subscribe", "my-topic")
.load();
}
}
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>sample-kafka-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>local-maven-repo</id>
<url>file:///${project.basedir}/local-maven-repo</url>
</repository>
</repositories>
<build>
<!-- Include resources folder in the .jar -->
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<!-- Plugin to compile the source. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Plugin to include all the dependencies in the .jar and set the main class. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.KafkaConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
[更新] UBER-JAR
下面使用pom.xml生成uber-jar
的配置
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.KafkaConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
kafka
数据源是一个 external 模块,默认情况下不可用于 Spark 应用程序。
您必须在 pom.xml
中将其定义为依赖项(正如您所做的那样),但这只是将其包含在 Spark 应用程序中的第一步。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
有了这个依赖,你必须决定是否要创建一个所谓的 uber-jar,它将所有的依赖捆绑在一起(这会导致一个相当大的 jar文件并使提交时间更长)或使用 --packages
(或不太灵活 --jars
)选项在 spark-submit
时间添加依赖项。
(还有其他选项,例如将所需的 jar 存储在 Hadoop HDFS 上或使用特定于 Hadoop 分发的方式来定义 Spark 应用程序的依赖项,但让我们保持简单)
我建议首先使用 --packages
,只有当它有效时才考虑其他选项。
使用spark-submit --packages
包含spark-sql-kafka-0-10模块如下
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
根据需要包括其他命令行选项。
Uber-Jar 方法
由于 META-INF
目录的处理方式,将所有依赖项包含在所谓的 uber-jar 中可能并不总是有效。
要使 kafka
数据源(以及一般的其他数据源)正常工作,您必须确保所有数据源中的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
已 合并 (不是 replace
或 first
或您使用的任何策略)。
kafka
数据源使用自己的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider 作为 kafka
格式的数据源提供者。
即使我也有类似的问题,当我们将 Cloudera-Spark 版本从 2.2 升级 --> 2.3 时问题就开始了。
问题是: 我的 uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister 被其他一些 jar 中存在的类似文件覆盖。因此它无法在 'DataSourceRegister' 文件中找到 KafkaConsumer 条目。
解析:
修改 POM.xml 帮助了我。
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
对于 uber-jar,将 ServicesResourceTransformer 添加到 shade-plugin 对我有用。
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
我的方案不一样,我直接在submit-job命令指定spark-sql-kafka包:
.\bin\spark-submit --master local --class "org.myspark.KafkaStream" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 <path_to_jar>
相关:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying
最上面的答案是正确的,这解决了我的问题:
assemblyMergeStrategy in assembly := {
case "reference.conf" => MergeStrategy.concat
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
我遇到了同样的错误。我花了几天时间才弄清楚。当您从 Maven 存储库复制依赖项时,特别是“spark-sql-kafka”,它包含以下行:
<scope> provided </scope>
解决方案是删除此行,以便依赖项 运行 在默认的“编译”范围内。如果您使用 SBT,情况也是如此。如果有其他依赖项,也可能值得将其删除,以防万一。
我遇到了示例问题,但是 gradle 和 shadowJar。添加后有效:
shadowJar {
mergeServiceFiles()
}
assemble.dependsOn shadowJar
我遇到了同样的错误,因为我在用于修复阴影的阴影插件中排除了 META-INF
下的所有内容 overlapping resource
警告,
<exclude>META-INF/**</exclude>
但是 classLoader 需要资源来知道注册了哪个 DataSource。
所以删除这个排除,它对我来说很好用。
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
希望对大家有所帮助。
我使用 HDP-2.6.3.0 和 Spark2 包 2.2.0。
我正在尝试使用结构化流 API 编写 Kafka 消费者,但在将作业提交到集群后出现以下错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:553)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:89)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:198)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:90)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:90)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
at com.example.KafkaConsumer.main(KafkaConsumer.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$runMain(SparkSubmit.scala:782)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun$anonfun$apply.apply(DataSource.scala:537)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537)
at org.apache.spark.sql.execution.datasources.DataSource$anonfun.apply(DataSource.scala:537)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:537)
... 17 more
以下 spark-submit
命令:
$SPARK_HOME/bin/spark-submit \
--master yarn \
--deploy-mode client \
--class com.example.KafkaConsumer \
--executor-cores 2 \
--executor-memory 512m \
--driver-memory 512m \
sample-kafka-consumer-0.0.1-SNAPSHOT.jar
我的java代码:
package com.example;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class KafkaConsumer {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("kafkaConsumerApp")
.getOrCreate();
Dataset<Row> ds = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "dog.mercadoanalitico.com.br:6667")
.option("subscribe", "my-topic")
.load();
}
}
pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>sample-kafka-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<dependencies>
<!-- spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<!-- kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.1.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>local-maven-repo</id>
<url>file:///${project.basedir}/local-maven-repo</url>
</repository>
</repositories>
<build>
<!-- Include resources folder in the .jar -->
<resources>
<resource>
<directory>${basedir}/src/main/resources</directory>
</resource>
</resources>
<plugins>
<!-- Plugin to compile the source. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<!-- Plugin to include all the dependencies in the .jar and set the main class. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.KafkaConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
[更新] UBER-JAR
下面使用pom.xml生成uber-jar
的配置 <plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<!-- This filter is to workaround the problem caused by included signed jars.
java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
-->
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.example.KafkaConsumer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
kafka
数据源是一个 external 模块,默认情况下不可用于 Spark 应用程序。
您必须在 pom.xml
中将其定义为依赖项(正如您所做的那样),但这只是将其包含在 Spark 应用程序中的第一步。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
有了这个依赖,你必须决定是否要创建一个所谓的 uber-jar,它将所有的依赖捆绑在一起(这会导致一个相当大的 jar文件并使提交时间更长)或使用 --packages
(或不太灵活 --jars
)选项在 spark-submit
时间添加依赖项。
(还有其他选项,例如将所需的 jar 存储在 Hadoop HDFS 上或使用特定于 Hadoop 分发的方式来定义 Spark 应用程序的依赖项,但让我们保持简单)
我建议首先使用 --packages
,只有当它有效时才考虑其他选项。
使用spark-submit --packages
包含spark-sql-kafka-0-10模块如下
spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0
根据需要包括其他命令行选项。
Uber-Jar 方法
由于 META-INF
目录的处理方式,将所有依赖项包含在所谓的 uber-jar 中可能并不总是有效。
要使 kafka
数据源(以及一般的其他数据源)正常工作,您必须确保所有数据源中的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
已 合并 (不是 replace
或 first
或您使用的任何策略)。
kafka
数据源使用自己的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister that registers org.apache.spark.sql.kafka010.KafkaSourceProvider 作为 kafka
格式的数据源提供者。
即使我也有类似的问题,当我们将 Cloudera-Spark 版本从 2.2 升级 --> 2.3 时问题就开始了。
问题是: 我的 uber jar META-INF/serives/org.apache.spark.sql.sources.DataSourceRegister 被其他一些 jar 中存在的类似文件覆盖。因此它无法在 'DataSourceRegister' 文件中找到 KafkaConsumer 条目。
解析: 修改 POM.xml 帮助了我。
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
对于 uber-jar,将 ServicesResourceTransformer 添加到 shade-plugin 对我有用。
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
我的方案不一样,我直接在submit-job命令指定spark-sql-kafka包:
.\bin\spark-submit --master local --class "org.myspark.KafkaStream" --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0 <path_to_jar>
相关:http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#deploying
最上面的答案是正确的,这解决了我的问题:
assemblyMergeStrategy in assembly := {
case "reference.conf" => MergeStrategy.concat
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case _ => MergeStrategy.first
}
我遇到了同样的错误。我花了几天时间才弄清楚。当您从 Maven 存储库复制依赖项时,特别是“spark-sql-kafka”,它包含以下行:
<scope> provided </scope>
解决方案是删除此行,以便依赖项 运行 在默认的“编译”范围内。如果您使用 SBT,情况也是如此。如果有其他依赖项,也可能值得将其删除,以防万一。
我遇到了示例问题,但是 gradle 和 shadowJar。添加后有效:
shadowJar {
mergeServiceFiles()
}
assemble.dependsOn shadowJar
我遇到了同样的错误,因为我在用于修复阴影的阴影插件中排除了 META-INF
下的所有内容 overlapping resource
警告,
<exclude>META-INF/**</exclude>
但是 classLoader 需要资源来知道注册了哪个 DataSource。 所以删除这个排除,它对我来说很好用。
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
希望对大家有所帮助。