KafkaUtils.createDirectStream 到字符串对象 Spark

KafkaUtils.createDirectStream to a String object Spark

我有一个 Kafka Producer,它从一个目录读取并将文件的内容写入一个主题

  def main(args: Array[String]) {
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = Array("quickstart.cloudera:9092", "test","10","10")
val directoryPath = "/home/cloudera/Documents/config/"
// Zookeeper connection properties
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

val myDirectory= new File(directoryPath)
var lines =""
for (file <- myDirectory.listFiles) {
   lines = scala.io.Source.fromFile(file).mkString

   val message = new ProducerRecord[String, String](topic, null, lines)
   producer.send(message)
   print(lines)
   Thread.sleep(1000)
}

同样,我使用 spark Direct streaming 作为我的消费者

val lines = KafkaUtils.createDirectStream[Array[Byte], String, DefaultDecoder, StringDecoder](ssc, kafkaConf, Set(topic)).map(_._2)

val str = lines.print(10)

我可以打印文件的内容。 我正在使用单一主题。 我必须从这个 DStream 中获取 RDD,并将整个内容放入一个字符串对象中,以便我可以将它传递给一个方法。 有人可以帮忙吗?

您要找的API是:

DStream.foreachRDD(func)

它将函数 func 应用于从流生成的每个 RDD。 因此,对于您的用例,我可能会编写以下代码:

lines.foreachRDD(rdd => {
  val data = rdd.collect().mkString("\n")
  println(data)
})

请注意,由于此代码在驱动程序进程上运行,您必须确保它有足够的资源来处理给定的文件。 通常,应该使用这个API将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或者通过网络将其写入数据库。

您可以在 Spark's programming guide.

上进一步阅读 DStreams 的其他输出操作