Spark Kafka 连接失败:bootstrap.servers 中没有可解析的 bootstrap 网址
Spark Kafka connection failed : No resolvable bootstrap urls given in bootstrap.servers
我尝试使用 Spark 3.0.2 阅读 kafka 主题,我使用以下内容执行 spark shell库:
- spark-sql-kafka-0-10_2.12-3.0.2.jar
- kafka-avro-serializer-6.2.0.jar
- kafka-clients-2.4.1.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.0.2.jar
- spark-tags_2.12-3.0.2.jar
- spark-token-provider-kafka-0-10_2.12-3.0.2.jar
- commons-pool2-2.6.2.jar
我得到以下带有错误堆栈跟踪的输出:
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----------+----------+-------+--------+--------+
|COL1|CUSTOMSREF|MPSIDCCKEY|MPSCOMP|MPSCREF1|MPSCREF2|
+-----+----------+----------+-------+--------+--------+
+-----+----------+----------+-------+--------+--------+
21/07/21 10:14:30 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 20, 172.20.0.4, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:122)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:59)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:554)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:539)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get(KafkaDataConsumer.scala:285)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:598)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:281)
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run(WriteToDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2(WriteToDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
... 38 more
谁知道怎么解决?
这是 scala 中的 spark 代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import java.nio.file.Files
import java.nio.file.Paths
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.streaming.Trigger
val schema = new String(Files.readAllBytes(Paths.get("/path/to/avro/schema.avsc")))
val topic = "topic_A"
val streamDf = spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9192")
.option("subscribe", topic)
.option("kafka.group.id", "id_A")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/path/to/truststore/certificate.jks")
.option("kafka.ssl.truststore.password", "password")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.plain.PlainLoginModule required username="user_A" password="password";""")
.option("startingOffsets", "latest")
.load()
val dataDf = streamDf.selectExpr("CAST(key as STRING)", "value")
.select(from_avro(col("value"),schema) as "data")
.select("data.*")
dataDf
.writeStream
.format("console")
.outputMode("append")
.trigger(Trigger.ProcessingTime("3 seconds"))
.start
非常感谢。
我通过在所有节点中添加kafka代理的IP地址解决了No resolvable bootstrap urls问题的火花簇。
之前,我只是编辑了master节点中的/etc/hosts文件。
感谢@koiralo 和@OneCricketeer 的建议。
我尝试使用 Spark 3.0.2 阅读 kafka 主题,我使用以下内容执行 spark shell库:
- spark-sql-kafka-0-10_2.12-3.0.2.jar
- kafka-avro-serializer-6.2.0.jar
- kafka-clients-2.4.1.jar
- spark-streaming-kafka-0-10-assembly_2.12-3.0.2.jar
- spark-tags_2.12-3.0.2.jar
- spark-token-provider-kafka-0-10_2.12-3.0.2.jar
- commons-pool2-2.6.2.jar
我得到以下带有错误堆栈跟踪的输出:
-------------------------------------------
Batch: 0
-------------------------------------------
+-----+----------+----------+-------+--------+--------+
|COL1|CUSTOMSREF|MPSIDCCKEY|MPSCOMP|MPSCREF1|MPSCREF2|
+-----+----------+----------+-------+--------+--------+
+-----+----------+----------+-------+--------+--------+
21/07/21 10:14:30 WARN TaskSetManager: Lost task 4.0 in stage 4.0 (TID 20, 172.20.0.4, executor 1): org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:820)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:612)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.createConsumer(KafkaDataConsumer.scala:122)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.<init>(KafkaDataConsumer.scala:59)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:206)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool$ObjectFactory.create(InternalKafkaConsumerPool.scala:201)
at org.apache.commons.pool2.BaseKeyedPooledObjectFactory.makeObject(BaseKeyedPooledObjectFactory.java:60)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.create(GenericKeyedObjectPool.java:1041)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:342)
at org.apache.commons.pool2.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:265)
at org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumerPool.borrowObject(InternalKafkaConsumerPool.scala:84)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.retrieveConsumer(KafkaDataConsumer.scala:554)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getOrRetrieveConsumer(KafkaDataConsumer.scala:539)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get(KafkaDataConsumer.scala:285)
at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:598)
at org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:281)
at org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run(WriteToDataSourceV2Exec.scala:438)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1411)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:477)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2(WriteToDataSourceV2Exec.scala:385)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:462)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:465)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:735)
... 38 more
谁知道怎么解决?
这是 scala 中的 spark 代码:
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Deserializer
import java.nio.file.Files
import java.nio.file.Paths
import org.apache.spark.sql.avro.functions._
import org.apache.spark.sql.streaming.Trigger
val schema = new String(Files.readAllBytes(Paths.get("/path/to/avro/schema.avsc")))
val topic = "topic_A"
val streamDf = spark
.readStream.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9192")
.option("subscribe", topic)
.option("kafka.group.id", "id_A")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.mechanism", "PLAIN")
.option("kafka.ssl.truststore.location", "/path/to/truststore/certificate.jks")
.option("kafka.ssl.truststore.password", "password")
.option("kafka.sasl.jaas.config", """org.apache.kafka.common.security.plain.PlainLoginModule required username="user_A" password="password";""")
.option("startingOffsets", "latest")
.load()
val dataDf = streamDf.selectExpr("CAST(key as STRING)", "value")
.select(from_avro(col("value"),schema) as "data")
.select("data.*")
dataDf
.writeStream
.format("console")
.outputMode("append")
.trigger(Trigger.ProcessingTime("3 seconds"))
.start
非常感谢。
我通过在所有节点中添加kafka代理的IP地址解决了No resolvable bootstrap urls问题的火花簇。 之前,我只是编辑了master节点中的/etc/hosts文件。
感谢@koiralo 和@OneCricketeer 的建议。