Spark 和 Kafka 流式处理集成
Spark and Kafka Streaming Integration
我想集成spark streaming和kafka
我正在使用 Spark。 3.0.0 / Kafka_2.12-2.6.0 / spark-streaming-kafka-0-10_2.12-2.4.0.jar
我用以下电线启动了 spark-shell。
`./bin/spark-shell --jars spark-streaming-kafka-0-10_2.12-2.4.0.jar`
我在 scala 中尝试了 spark 中的电源线,如下所示
val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
val counts = ds.groupBy("value").count()
val query = counts.writeStream.outputMode("complete").format("console")
query.start()
但是我有如下错误
20/09/18 13:45:45 ERROR MicroBatchExecution: Query [id = a587f8d8-5c08- 44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c]
terminated with error
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.$anonfun$applyOrElse(MicroBatchExecution.scala:102)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:81)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 30 more
Exception in thread "stream execution thread for [id = a587f8d8-5c08-44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c]" java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.$anonfun$applyOrElse(MicroBatchExecution.scala:102)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:81)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 30 more
所以我再次尝试如下
`val stream = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").option("startingOffsets", "earliest").load()`
stream.show()
但我遇到了另一个错误
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:64)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:313) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan(QueryExecution.scala:107)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans(QueryExecution.scala:199)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:95)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
... 47 elided
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 116 more
我成功地进行了 kafka 流式传输测试,并使用 net-cat 进行了 spark socket 流式传输。.
但我不知道如何解决这些问题......
你能告诉我解决方案吗?谢谢
spark.readStream.format
是结构化流的一部分,包含的 jar 不正确。如果您将 jar 文件作为参数传递,则还需要传递 spark-sql-kafka-0-10_2.12-3.0.0.jar
及其所有依赖项。
简单的选择是使用如下的包格式,它也负责提取所有依赖项。
bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
提取所有依赖项并正常工作需要几秒钟。
scala> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
我想集成spark streaming和kafka
我正在使用 Spark。 3.0.0 / Kafka_2.12-2.6.0 / spark-streaming-kafka-0-10_2.12-2.4.0.jar
我用以下电线启动了 spark-shell。
`./bin/spark-shell --jars spark-streaming-kafka-0-10_2.12-2.4.0.jar`
我在 scala 中尝试了 spark 中的电源线,如下所示
val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
val counts = ds.groupBy("value").count()
val query = counts.writeStream.outputMode("complete").format("console")
query.start()
但是我有如下错误
20/09/18 13:45:45 ERROR MicroBatchExecution: Query [id = a587f8d8-5c08- 44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c]
terminated with error
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.$anonfun$applyOrElse(MicroBatchExecution.scala:102)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:81)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 30 more
Exception in thread "stream execution thread for [id = a587f8d8-5c08-44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c]" java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.$anonfun$applyOrElse(MicroBatchExecution.scala:102)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:95)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:81)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 30 more
所以我再次尝试如下
`val stream = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").option("startingOffsets", "earliest").load()`
stream.show()
但我遇到了另一个错误
java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:64)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:313) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:63)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan(QueryExecution.scala:107)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans(QueryExecution.scala:199)
at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:95)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
... 47 elided
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.kafka010.KafkaConfigUpdater
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 116 more
我成功地进行了 kafka 流式传输测试,并使用 net-cat 进行了 spark socket 流式传输。. 但我不知道如何解决这些问题...... 你能告诉我解决方案吗?谢谢
spark.readStream.format
是结构化流的一部分,包含的 jar 不正确。如果您将 jar 文件作为参数传递,则还需要传递 spark-sql-kafka-0-10_2.12-3.0.0.jar
及其所有依赖项。
简单的选择是使用如下的包格式,它也负责提取所有依赖项。
bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
提取所有依赖项并正常工作需要几秒钟。
scala> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]