为什么 Spark 应用程序失败并显示“ClassNotFoundException:无法找到数据源:kafka”作为带有 sbt 程序集的 uber-jar?
Why does Spark application fail with “ClassNotFoundException: Failed to find data source: kafka” as uber-jar with sbt assembly?
我正在尝试 运行 像 StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide 这样的样本。
我的密码是
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("boontadata-spark-job1")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
我添加了以下 sbt 文件:
build.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
我还添加了project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
这将创建一个包含非 provided
jar 的 Uber jar。
我提交以下行:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
但我得到这个 运行时间错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:132)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:132)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
有没有办法知道没有找到哪个 class,以便我可以在 maven.org 存储库中搜索那个 class。
lookupDataSource
源代码似乎在 https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala 的第 543 行,但我找不到直接 link 与 Kafka 数据源...
完整的源代码在这里:https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
在我的例子中,我在使用 sbt 编译时也遇到了这个错误,原因是 sbt assembly
没有将 spark-sql-kafka-0-10_2.11
工件作为 fat jar 的一部分。
(非常欢迎在这里发表评论。依赖项没有指定范围,因此不应假定为"provided")。
所以我改为部署一个普通的(苗条的)jar 并将依赖项与 --jars
参数一起包含到 spark-submit。
为了将所有依赖项集中在一个地方,您可以将 retrieveManaged := true
添加到您的 sbt 项目设置中,或者您可以在 sbt 控制台中发出:
> set retrieveManaged := true
> package
这应该将所有依赖项带到 lib_managed
文件夹中。
然后你可以复制所有这些文件(使用 bash 命令你可以使用这样的东西
cd /path/to/your/project
JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)
spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
我这样试过,它对我有用。像这样提交,一旦有任何问题让我知道
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
我是通过下载jar文件到驱动系统解决的。从那里,我提供了 jar 以使用 --jar 选项进行提交。
还要注意的是,我将整个 spark 2.1 环境打包到我的 uber jar 中(因为我的集群仍在 1.6.1 上)出于某种原因,它在包含在 uber jar 中时没有被拾取。
spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class
ClassNm --Other-Options YourJar.jar
我正在使用 spark 2.1 并面临同样的问题
我的解决方法是
1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2) cd ~/.ivy2/jars
在这里,所有需要的 jar 现在都在这个文件夹中
3) 将此文件夹中的所有jar 复制到所有节点(可以创建一个特定的文件夹来保存它们)
4) 将文件夹名称添加到 spark.driver.extraClassPath
和 spark.driver.extraClassPath
中,例如spark.driver.extraClassPath=/opt/jars/*:your_other_jars
5 spark-submit --class ClassNm --Other-Options YourJar.jar
现在工作正常
问题出在 build.sbt
中的以下部分:
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
它表示应丢弃所有 META-INF
个整体,包括使数据源别名(例如 kafka
)起作用的 "code"。
但是 META-INF
文件对于 kafka
(以及流数据源的其他别名)的工作非常重要。
要使 kafka
别名起作用,Spark SQL 使用 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 和以下条目:
org.apache.spark.sql.kafka010.KafkaSourceProvider
KafkaSourceProvider
is responsible to register kafka
alias with the proper streaming data source, i.e. KafkaSource.
只是为了检查真正的代码确实可用,但是注册别名的 "code" 不可用,您可以通过完全限定名称使用 kafka
数据源(不是别名)如下:
spark.readStream.
format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
load
由于缺少 kafka.bootstrap.servers
等选项,您会看到其他问题,但是...我们离题了。
一个解决方案是 MergeStrategy.concat
所有 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(这将创建一个包含所有数据源的 uber-jar,包括 kafka
数据源)。
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
这是鉴于 Jacek Laskowski 的回答。
那些在 maven 上构建项目的人可以试试这个。
将下面提到的行添加到您的 maven-shade-plugin。
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
我以 pom 文件的插件代码为例来说明添加该行的位置。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-uber</finalName>
</configuration>
</execution>
</executions>
</plugin>
请见谅我的格式化技巧。
我正在使用 gradle 作为构建工具和 shadowJar 插件来创建 uberJar。
解决方案只是添加一个文件
src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
到项目。
在此文件中,您需要逐行放置您使用的数据源的 class 名称,在本例中为 org.apache.spark.sql.kafka010.KafkaSourceProvider
(找到 class 名称例如 here)
原因是 Spark 在其内部依赖管理机制中使用了 java ServiceLoader。
完整示例 here.
虽然这是一个旧线程,但我在 Hortonworks 3.1.5 上使用 Pyspark 2.3.3 时遇到了这个问题,所以我认为它也许可以帮助其他人。
Spark Streaming 与 Kafka 2 的集成需要以下 jar。
注意:请根据Spark & Kafka的版本下载合适的jar。
我正在尝试 运行 像 StructuredKafkaWordCount. I started with the Spark Structured Streaming Programming guide 这样的样本。
我的密码是
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("boontadata-spark-job1")
.getOrCreate()
import spark.implicits._
// Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
// Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
// Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
我添加了以下 sbt 文件:
build.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
我还添加了project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
这将创建一个包含非 provided
jar 的 Uber jar。
我提交以下行:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
但我得到这个 运行时间错误:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$$anonfun$apply.apply(DataSource.scala:132)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun.apply(DataSource.scala:132)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
有没有办法知道没有找到哪个 class,以便我可以在 maven.org 存储库中搜索那个 class。
lookupDataSource
源代码似乎在 https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala 的第 543 行,但我找不到直接 link 与 Kafka 数据源...
完整的源代码在这里:https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
在我的例子中,我在使用 sbt 编译时也遇到了这个错误,原因是 sbt assembly
没有将 spark-sql-kafka-0-10_2.11
工件作为 fat jar 的一部分。
(非常欢迎在这里发表评论。依赖项没有指定范围,因此不应假定为"provided")。
所以我改为部署一个普通的(苗条的)jar 并将依赖项与 --jars
参数一起包含到 spark-submit。
为了将所有依赖项集中在一个地方,您可以将 retrieveManaged := true
添加到您的 sbt 项目设置中,或者您可以在 sbt 控制台中发出:
> set retrieveManaged := true
> package
这应该将所有依赖项带到 lib_managed
文件夹中。
然后你可以复制所有这些文件(使用 bash 命令你可以使用这样的东西
cd /path/to/your/project
JARLIST=$(find lib_managed -name '*.jar'| paste -sd , -)
spark-submit [other-args] target/your-app-1.0-SNAPSHOT.jar --jars "$JARLIST"
我这样试过,它对我有用。像这样提交,一旦有任何问题让我知道
./spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0 --class com.inndata.StructuredStreaming.Kafka --master local[*] /Users/apple/.m2/repository/com/inndata/StructuredStreaming/0.0.1SNAPSHOT/StructuredStreaming-0.0.1-SNAPSHOT.jar
我是通过下载jar文件到驱动系统解决的。从那里,我提供了 jar 以使用 --jar 选项进行提交。
还要注意的是,我将整个 spark 2.1 环境打包到我的 uber jar 中(因为我的集群仍在 1.6.1 上)出于某种原因,它在包含在 uber jar 中时没有被拾取。
spark-submit --jar /ur/path/spark-sql-kafka-0-10_2.11:2.1.0 --class ClassNm --Other-Options YourJar.jar
我正在使用 spark 2.1 并面临同样的问题 我的解决方法是
1) spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2) cd ~/.ivy2/jars
在这里,所有需要的 jar 现在都在这个文件夹中
3) 将此文件夹中的所有jar 复制到所有节点(可以创建一个特定的文件夹来保存它们)
4) 将文件夹名称添加到 spark.driver.extraClassPath
和 spark.driver.extraClassPath
中,例如spark.driver.extraClassPath=/opt/jars/*:your_other_jars
5 spark-submit --class ClassNm --Other-Options YourJar.jar
现在工作正常
问题出在 build.sbt
中的以下部分:
// META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
它表示应丢弃所有 META-INF
个整体,包括使数据源别名(例如 kafka
)起作用的 "code"。
但是 META-INF
文件对于 kafka
(以及流数据源的其他别名)的工作非常重要。
要使 kafka
别名起作用,Spark SQL 使用 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister 和以下条目:
org.apache.spark.sql.kafka010.KafkaSourceProvider
KafkaSourceProvider
is responsible to register kafka
alias with the proper streaming data source, i.e. KafkaSource.
只是为了检查真正的代码确实可用,但是注册别名的 "code" 不可用,您可以通过完全限定名称使用 kafka
数据源(不是别名)如下:
spark.readStream.
format("org.apache.spark.sql.kafka010.KafkaSourceProvider").
load
由于缺少 kafka.bootstrap.servers
等选项,您会看到其他问题,但是...我们离题了。
一个解决方案是 MergeStrategy.concat
所有 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(这将创建一个包含所有数据源的 uber-jar,包括 kafka
数据源)。
case "META-INF/services/org.apache.spark.sql.sources.DataSourceRegister" => MergeStrategy.concat
这是鉴于 Jacek Laskowski 的回答。
那些在 maven 上构建项目的人可以试试这个。 将下面提到的行添加到您的 maven-shade-plugin。
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
我以 pom 文件的插件代码为例来说明添加该行的位置。
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>
META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
</resource>
</transformer>
</transformers>
<finalName>${project.artifactId}-${project.version}-uber</finalName>
</configuration>
</execution>
</executions>
</plugin>
请见谅我的格式化技巧。
我正在使用 gradle 作为构建工具和 shadowJar 插件来创建 uberJar。 解决方案只是添加一个文件
src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
到项目。
在此文件中,您需要逐行放置您使用的数据源的 class 名称,在本例中为 org.apache.spark.sql.kafka010.KafkaSourceProvider
(找到 class 名称例如 here)
原因是 Spark 在其内部依赖管理机制中使用了 java ServiceLoader。
完整示例 here.
虽然这是一个旧线程,但我在 Hortonworks 3.1.5 上使用 Pyspark 2.3.3 时遇到了这个问题,所以我认为它也许可以帮助其他人。 Spark Streaming 与 Kafka 2 的集成需要以下 jar。
注意:请根据Spark & Kafka的版本下载合适的jar。