Spark streaming kafka 找不到 Set 的领导者偏移量

Spark streaming kafka Couldn't find leader offsets for Set

我使用 spark streaming 'org.apache.spark:spark-streaming_2.10:1.6.1' 和 'org.apache.spark:spark-streaming-kafka_2.10:1.6.1' 连接到 kafka broker 版本 0.10.0.1。当我尝试此代码时:

def messages = KafkaUtils.createDirectStream(jssc,
            String.class,
            String.class,
            StringDecoder.class,
            StringDecoder.class,
            kafkaParams,
            topicsSet)

我收到了这个异常:

    INFO consumer.SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelException
org.apache.spark.SparkException: Couldn't find leader offsets for Set([stream,0])
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
    at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
    at scala.util.Either.fold(Either.scala:97)
    at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
    at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
    at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
    at org.apache.spark.streaming.kafka.KafkaUtils$createDirectStream.call(Unknown Source)
    at org.codehaus.groovy.runtime.callsite.CallSiteArray.defaultCall(CallSiteArray.java:45)
    at org.codehaus.groovy.runtime.callsite.AbstractCallSite.call(AbstractCallSite.java:108)
    at com.privowny.classification.jobs.StreamingClassification.main(StreamingClassification.groovy:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

我试着在这个网站上搜索一些答案,但似乎没有答案,你能给我一些建议吗?题目stream不为空

我从经验中知道,如果 Spark 驱动程序无法使用代理的广告主机名(advertised.host.name in server.properties)访问 kafka 代理,则可能导致此错误消息的一件事。即使 spark 配置识别使用不同地址的 kafka 代理也是如此。所有代理的广告主机名都必须可以从 Spark 驱动程序访问。

这发生在我身上是因为集群在一个单独的 AWS 账户中运行,经纪人使用内部 DNS 记录来识别自己,这些必须被复制到另一个 AWS 账户。在此之前,我收到了此错误消息,因为 Spark 驱动程序无法联系代理以请求他们的最新偏移量,即使我们在 spark 配置中使用了代理的私有 IP 地址。

希望对某人有所帮助。

我也遇到了这个问题。所以你必须改变你的卡夫卡的一些配置。

转到您的 Kafka 配置和配置 listeners;

在套接字服务器设置部分,格式为:

listeners=PLAINTEXT://[hostname or IP]:[port]

例如:

listeners=PLAINTEXT://192.168.1.24:9092

我是来自HDP的运行 kafka,所以默认端口是6667而不是9092,当我将bootstrap.servers的端口切换到<hostname>:6667时,问题就解决了。