Spark 和 Kafka 流式处理集成

Spark and Kafka Streaming Integration

我想集成spark streaming和kafka

我正在使用 Spark。 3.0.0 / Kafka_2.12-2.6.0 / spark-streaming-kafka-0-10_2.12-2.4.0.jar

我用以下电线启动了 spark-shell。

`./bin/spark-shell --jars spark-streaming-kafka-0-10_2.12-2.4.0.jar`

我在 scala 中尝试了 spark 中的电源线,如下所示

val ds = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
val counts = ds.groupBy("value").count()
val query = counts.writeStream.outputMode("complete").format("console")
query.start()

但是我有如下错误

 20/09/18 13:45:45 ERROR MicroBatchExecution: Query [id = a587f8d8-5c08- 44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c] 
 terminated with error
 java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.$anonfun$applyOrElse(MicroBatchExecution.scala:102)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 30 more
 Exception in thread "stream execution thread for [id = a587f8d8-5c08-44ab-8901-4882cf87b2a3, runId = 0191ebde-cadf-473e-9221-796c64f39a2c]" java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
    at org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan.toMicroBatchStream(KafkaSourceProvider.scala:466)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.$anonfun$applyOrElse(MicroBatchExecution.scala:102)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:95)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun.applyOrElse(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:72)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:309)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren(TreeNode.scala:399)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:314)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:149)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:147)
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    ala:29)apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.sc
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:298)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan$lzycompute(MicroBatchExecution.scala:81)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.logicalPlan(MicroBatchExecution.scala:61)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:322)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)
 Caused by: java.lang.ClassNotFoundException: org.apache.spark.kafka010.KafkaConfigUpdater
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 30 more

所以我再次尝试如下

`val stream = spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").option("startingOffsets", "earliest").load()`
stream.show()

但我遇到了另一个错误

java.lang.NoClassDefFoundError: org/apache/spark/kafka010/KafkaConfigUpdater at org.apache.spark.sql.kafka010.KafkaSourceProvider$.kafkaParamsForDriver(KafkaSourceProvider.scala:580)
 at org.apache.spark.sql.kafka010.KafkaRelation.buildScan(KafkaRelation.scala:64) 
 at org.apache.spark.sql.execution.datasources.DataSourceStrategy.apply(DataSourceStrategy.scala:313) at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:63)
 at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
 at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
 at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
 at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
 at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
 at scala.collection.Iterator.foreach(Iterator.scala:941)
 at scala.collection.Iterator.foreach$(Iterator.scala:941)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
 at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
 at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
 at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:78)
 at scala.collection.TraversableOnce.$anonfun$foldLeft(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.$anonfun$foldLeft$adapted(TraversableOnce.scala:162)
 at scala.collection.Iterator.foreach(Iterator.scala:941)
 at scala.collection.Iterator.foreach$(Iterator.scala:941)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
 at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
 at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
 at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan(QueryPlanner.scala:75)
 at scala.collection.Iterator$$anon.nextCur(Iterator.scala:484)
 at scala.collection.Iterator$$anon.hasNext(Iterator.scala:490)
 at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
 at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:68)
 at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:330)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan(QueryExecution.scala:94)
 at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
 at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
 at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:94)
 at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:87)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan(QueryExecution.scala:107)
 at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase(QueryExecution.scala:133)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
 at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:133)
 at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:107)
 at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:100)
 at org.apache.spark.sql.execution.QueryExecution.$anonfun$writePlans(QueryExecution.scala:199)
 at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:381)
 at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$writePlans(QueryExecution.scala:199)
 at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:207)
 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:95)
 at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId(SQLExecution.scala:87)
 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:763)
 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
 at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614)
 at org.apache.spark.sql.Dataset.head(Dataset.scala:2695)
 at org.apache.spark.sql.Dataset.take(Dataset.scala:2902)
 at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
 at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:824)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:783)
 at org.apache.spark.sql.Dataset.show(Dataset.scala:792)
 ... 47 elided
 Caused by: java.lang.ClassNotFoundException: 
 org.apache.spark.kafka010.KafkaConfigUpdater
 at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
 at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
 ... 116 more

我成功地进行了 kafka 流式传输测试,并使用 net-cat 进行了 spark socket 流式传输。. 但我不知道如何解决这些问题...... 你能告诉我解决方案吗?谢谢

spark.readStream.format 是结构化流的一部分,包含的 jar 不正确。如果您将 jar 文件作为参数传递,则还需要传递 spark-sql-kafka-0-10_2.12-3.0.0.jar 及其所有依赖项。

简单的选择是使用如下的包格式,它也负责提取所有依赖项。

bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0

提取所有依赖项并正常工作需要几秒钟。

scala> val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "tp").load()
df: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]