如何在 Maven pom 文件中获取 Spark/Kafka org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 依赖项?
How to get Spark/Kafka org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 Dependency working in Maven pom file?
这个问题已经在 之前讨论过,但是在撰写本文时,我没有足够的声誉来对 Algomeisters 解决方案发表评论(最终没有为我工作)
我有一个使用 Kafka 和结构化流的 spark 作业。因此它需要我有 spark-sql-kafka-0-10
模块的依赖关系。
Jacek Laskowski 说过必须在 Spark 提交命令行选项中包含这个包
Structured Streaming support for Kafka is in a separate
spark-sql-kafka-0-10 module (aka library dependency).
spark-sql-kafka-0-10 module is not included by default so you have to
start spark-submit (and "derivatives" like spark-shell) with
--packages command-line option to "install" it.
我已经完成了,下面是我的 spark 提交
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 \
--class MyApp.MainClassPath \
--master local[4] \
MySparkApp-0.0.1-jar-with-dependencies.jar
但是,我认为这不是一个好的选择,每次我 运行 这个 jar 都必须重新下载依赖项。如果由于某种原因此依赖项不可用,我的应用程序将不再运行。我正在使用 Maven 作为包管理器,我的 pom 文件中有这个包,但是它不起作用。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
<scope>runtime</scope>
</dependency>
当我尝试 运行 spark 作业并删除“--packages”选项时,出现以下错误
Exception in thread "main" java.lang.ClassNotFoundException:
Failed to find data source: kafka. Please find packages at
http://spark.apache.org/third-party-projects.html
这是由
引起的
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
因此我尝试使用 Maven-Shade 插件包含 Algomeister 提供的解决方案,但它没有用。我收到以下错误!
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:330)
at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:263)
at java.util.jar.JarVerifier.processEntry(JarVerifier.java:318)
at java.util.jar.JarVerifier.update(JarVerifier.java:230)
at java.util.jar.JarFile.initializeVerifier(JarFile.java:383)
at java.util.jar.JarFile.getInputStream(JarFile.java:450)
at sun.misc.URLClassPath$JarLoader.getInputStream(URLClassPath.java:977)
at sun.misc.Resource.cachedInputStream(Resource.java:77)
at sun.misc.Resource.getByteBuffer(Resource.java:160)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:454)
at java.net.URLClassLoader.access0(URLClassLoader.java:73)
at java.net.URLClassLoader.run(URLClassLoader.java:368)
at java.net.URLClassLoader.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:695)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
如果我们能为此找到可行的解决方案,那就太好了。谢谢。
使用下面使用 shade 插件的 maven 文件。示例 class 来自 spark-2.2.0
个示例 - JavaStructuredKafkaWordCount.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>structured-kafka-word-count</groupId>
<artifactId>structured-kafka-word-count</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
您可以 运行 使用如下 spark-submit
命令。
spark-submit --class org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount --master local[2] structured-kafka-word-count-1.0.0.jar localhost:9092 subscribe mytopic
这个问题已经在
我有一个使用 Kafka 和结构化流的 spark 作业。因此它需要我有 spark-sql-kafka-0-10
模块的依赖关系。
Jacek Laskowski 说过必须在 Spark 提交命令行选项中包含这个包
Structured Streaming support for Kafka is in a separate spark-sql-kafka-0-10 module (aka library dependency).
spark-sql-kafka-0-10 module is not included by default so you have to start spark-submit (and "derivatives" like spark-shell) with --packages command-line option to "install" it.
我已经完成了,下面是我的 spark 提交
SPARK_KAFKA_VERSION=0.10 spark2-submit \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 \
--class MyApp.MainClassPath \
--master local[4] \
MySparkApp-0.0.1-jar-with-dependencies.jar
但是,我认为这不是一个好的选择,每次我 运行 这个 jar 都必须重新下载依赖项。如果由于某种原因此依赖项不可用,我的应用程序将不再运行。我正在使用 Maven 作为包管理器,我的 pom 文件中有这个包,但是它不起作用。
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
<scope>runtime</scope>
</dependency>
当我尝试 运行 spark 作业并删除“--packages”选项时,出现以下错误
Exception in thread "main" java.lang.ClassNotFoundException:
Failed to find data source: kafka. Please find packages at
http://spark.apache.org/third-party-projects.html
这是由
引起的Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
因此我尝试使用 Maven-Shade 插件包含 Algomeister 提供的解决方案,但它没有用。我收到以下错误!
Exception in thread "main" java.lang.SecurityException: Invalid signature file digest for Manifest main attributes
at sun.security.util.SignatureFileVerifier.processImpl(SignatureFileVerifier.java:330)
at sun.security.util.SignatureFileVerifier.process(SignatureFileVerifier.java:263)
at java.util.jar.JarVerifier.processEntry(JarVerifier.java:318)
at java.util.jar.JarVerifier.update(JarVerifier.java:230)
at java.util.jar.JarFile.initializeVerifier(JarFile.java:383)
at java.util.jar.JarFile.getInputStream(JarFile.java:450)
at sun.misc.URLClassPath$JarLoader.getInputStream(URLClassPath.java:977)
at sun.misc.Resource.cachedInputStream(Resource.java:77)
at sun.misc.Resource.getByteBuffer(Resource.java:160)
at java.net.URLClassLoader.defineClass(URLClassLoader.java:454)
at java.net.URLClassLoader.access0(URLClassLoader.java:73)
at java.net.URLClassLoader.run(URLClassLoader.java:368)
at java.net.URLClassLoader.run(URLClassLoader.java:362)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:229)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:695)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
如果我们能为此找到可行的解决方案,那就太好了。谢谢。
使用下面使用 shade 插件的 maven 文件。示例 class 来自 spark-2.2.0
个示例 - JavaStructuredKafkaWordCount.
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>structured-kafka-word-count</groupId>
<artifactId>structured-kafka-word-count</artifactId>
<version>1.0.0</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
您可以 运行 使用如下 spark-submit
命令。
spark-submit --class org.apache.spark.examples.sql.streaming.JavaStructuredKafkaWordCount --master local[2] structured-kafka-word-count-1.0.0.jar localhost:9092 subscribe mytopic