找不到记录器的附加程序 (org.apache.kafka.clients.producer.ProducerConfig)
No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig)
我正在编写一个代码,我在其中尝试使用 kafka 和 spark 来消费消息。
但是我的代码不起作用。这是我的代码:
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util._
object Smack_Kafka_Spark extends App {
def main(args: Array[String]) {
val kafkaBrokers = "localhost:2181"
val kafkaOpTopic = "test"
/*val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")*/
val props = new Properties()
props.put("bootstrap.servers", "localhost:2181")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
var spark: SparkSession = null
val textFile: RDD[String] = spark.sparkContext.textFile("dataset.txt")
textFile.foreach(record => {
val data = record.toString
val message = new ProducerRecord[String, String](kafkaOpTopic, null, data)
producer.send(message)
})
producer.close()
}
}
这是我得到的错误:
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at Smack_Kafka_Spark$.main(Smack_Kafka_Spark.scala:25)
at Smack_Kafka_Spark.main(Smack_Kafka_Spark.scala)
如有任何帮助,我将不胜感激!
您得到 NullPointerException
因为 SparkSession
为空。像下面这样创建它。
val spark : SparkSession = SparkSession.builder()
.appName("Smack_Kafka_Spark")
.master("local[*]")
.getOrCreate()
现在阅读您的文本文件,如下所示。
val textFile: Dataset[String] = spark.read.textFile("dataset.txt")
当您 运行 您的程序是
时,您可能会遇到的另一个问题
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
KafkaProducer
不可序列化。您需要将您的 KafkaProducer 实例创建移动到 foreachPartition 中。请检查 SO post
我正在编写一个代码,我在其中尝试使用 kafka 和 spark 来消费消息。 但是我的代码不起作用。这是我的代码:
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util._
object Smack_Kafka_Spark extends App {
def main(args: Array[String]) {
val kafkaBrokers = "localhost:2181"
val kafkaOpTopic = "test"
/*val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")*/
val props = new Properties()
props.put("bootstrap.servers", "localhost:2181")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
var spark: SparkSession = null
val textFile: RDD[String] = spark.sparkContext.textFile("dataset.txt")
textFile.foreach(record => {
val data = record.toString
val message = new ProducerRecord[String, String](kafkaOpTopic, null, data)
producer.send(message)
})
producer.close()
}
}
这是我得到的错误:
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at Smack_Kafka_Spark$.main(Smack_Kafka_Spark.scala:25)
at Smack_Kafka_Spark.main(Smack_Kafka_Spark.scala)
如有任何帮助,我将不胜感激!
您得到 NullPointerException
因为 SparkSession
为空。像下面这样创建它。
val spark : SparkSession = SparkSession.builder()
.appName("Smack_Kafka_Spark")
.master("local[*]")
.getOrCreate()
现在阅读您的文本文件,如下所示。
val textFile: Dataset[String] = spark.read.textFile("dataset.txt")
当您 运行 您的程序是
时,您可能会遇到的另一个问题Exception in thread "main" org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
KafkaProducer
不可序列化。您需要将您的 KafkaProducer 实例创建移动到 foreachPartition 中。请检查 SO post