无法从 Spark Streaming 连接到 Kafka:org.apache.spark.SparkException:java.net.SocketTimeoutException
Cannot connect from Spark Streaming to Kafka: org.apache.spark.SparkException: java.net.SocketTimeoutException
我正在尝试使用 Spark Streaming 直接流读取 Kafka 主题,但收到以下错误:
INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
我有 Kafka 0.7.1 和 Spark 1.5.2。
我正在使用以下代码:
val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))
val topicsSet = Set("myTopic")
val kafkaParams = Map[String, String]
("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
我确定该主题已经存在,因为其他应用程序正在正确读取它。
尽量不要使用旧版本的kafka,在你的情况下是(0.7.1)。
如果您有充分的理由使用 0.7.1,请告诉我。
查看您的异常,应用程序似乎无法连接到 kafka 代理。
我已经使用这个直接流 api 从 kafka 0.8.2 读取数据。
https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
希望这能解决您的问题。
感谢和问候,
维卡斯盖特
我正在尝试使用 Spark Streaming 直接流读取 Kafka 主题,但收到以下错误:
INFO consumer.SimpleConsumer: Reconnect due to socket error: java.net.SocketTimeoutException
ERROR yarn.ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
org.apache.spark.SparkException: java.net.SocketTimeoutException
java.net.SocketTimeoutException
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:422)
我有 Kafka 0.7.1 和 Spark 1.5.2。
我正在使用以下代码:
val ssc : StreamingContext = new StreamingContext(sparkContext, Seconds(60))
val topicsSet = Set("myTopic")
val kafkaParams = Map[String, String]
("metadata.broker.list" -> "mybrokerhostname1:9092,mybrokerhostname2:9092")
val stream = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topicsSet)
我确定该主题已经存在,因为其他应用程序正在正确读取它。
尽量不要使用旧版本的kafka,在你的情况下是(0.7.1)。 如果您有充分的理由使用 0.7.1,请告诉我。 查看您的异常,应用程序似乎无法连接到 kafka 代理。
我已经使用这个直接流 api 从 kafka 0.8.2 读取数据。 https://github.com/koeninger/kafka-exactly-once/blob/master/src/main/scala/example/TransactionalPerBatch.scala
希望这能解决您的问题。
感谢和问候, 维卡斯盖特