Apache Kafka:超时异常与/ Spark Streaming
Apache Kafka: Time Out Exception w/ Spark Streaming
背景:
我在 Scala 和 Kafka 中使用 Spark Streaming 程序。我的目的是将文件读取到 Kafka 并将这些消息发布到 Spark Streaming 应用程序以进行一些分析。
问题
但是,当我将文件通过管道传输到 Kafka 并启动我的 Streaming 应用程序以收听特定主题时,我在 Kafka 生产者控制台上看到了这些错误消息。
用于读取文件的命令:
C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv
错误:
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
我 运行 这个应用程序在我的 Windows 机器上本地运行,Kafka 服务器也在我的机器上本地运行。
Spark 应用程序类似于:
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.foreachRDD((rdd, time) => {
// do something
}
我不确定关于 Kafka/Spark 的错误究竟意味着什么。
任何继续进行的指示将不胜感激。
该错误与 Spark/Spark 流式处理无关。
您的 Kafka 设置似乎有问题。
超时错误通常发生在 Zookeeper 设置出现问题时。你是否正确配置了你的动物园管理员?
确保设置正确。另外,请先尝试 运行 Kafka 附带的简单 Kafka 生产者和消费者脚本。
毕竟这是卡夫卡的问题。我怀疑这更多地与我下载的用于 Spark Streaming 的 Kafka 版本有关,而不是与 Kafka 设置本身有关。
我已经为 Spark Streaming 1.6.2 下载了 Kafka 0.10。0.x -> 这是我遇到超时错误的时候。
我发现这个 link: https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#linking,其中指出:“Kafka:Spark Streaming 1.6.2 与 Kafka 0.8.2.1. 兼容”。
所以,当我下载 0.8.2.1 时,它工作正常 - 我不再得到 "TimeOut Errors"。
背景: 我在 Scala 和 Kafka 中使用 Spark Streaming 程序。我的目的是将文件读取到 Kafka 并将这些消息发布到 Spark Streaming 应用程序以进行一些分析。
问题 但是,当我将文件通过管道传输到 Kafka 并启动我的 Streaming 应用程序以收听特定主题时,我在 Kafka 生产者控制台上看到了这些错误消息。
用于读取文件的命令:
C:\Kafka\bin\windows>kafka-console-producer --broker-list localhost:9092 --topic mytopic2 < C:\somefile.csv
错误:
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
[2016-09-04 10:08:42,122] ERROR Error when sending message to topic mytopic2 w
h key: null, value: 116 bytes with error: (org.apache.kafka.clients.producer.i
ernals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 109 record(s
expired due to timeout while requesting metadata from brokers for mytopic2-0
我 运行 这个应用程序在我的 Windows 机器上本地运行,Kafka 服务器也在我的机器上本地运行。
Spark 应用程序类似于:
val kafkaParams = Map("metadata.broker.list" -> "localhost:9092")
val topics = List("mytopic2").toSet
val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)
lines.foreachRDD((rdd, time) => {
// do something
}
我不确定关于 Kafka/Spark 的错误究竟意味着什么。
任何继续进行的指示将不胜感激。
该错误与 Spark/Spark 流式处理无关。 您的 Kafka 设置似乎有问题。
超时错误通常发生在 Zookeeper 设置出现问题时。你是否正确配置了你的动物园管理员? 确保设置正确。另外,请先尝试 运行 Kafka 附带的简单 Kafka 生产者和消费者脚本。
毕竟这是卡夫卡的问题。我怀疑这更多地与我下载的用于 Spark Streaming 的 Kafka 版本有关,而不是与 Kafka 设置本身有关。
我已经为 Spark Streaming 1.6.2 下载了 Kafka 0.10。0.x -> 这是我遇到超时错误的时候。 我发现这个 link: https://spark.apache.org/docs/1.6.2/streaming-programming-guide.html#linking,其中指出:“Kafka:Spark Streaming 1.6.2 与 Kafka 0.8.2.1. 兼容”。
所以,当我下载 0.8.2.1 时,它工作正常 - 我不再得到 "TimeOut Errors"。