Spark Streaming - 将 json 格式的消息 Dstream 到 DataFrame

Spark Streaming - Dstream messages in json format to DataFrame

我正在尝试通过 Apache Spark Streaming 阅读 Kafka 主题,但无法弄清楚如何将 DStream 中的数据转换为 DataFrame,然后存储在临时 table 中。 Kafka 中的消息采用 Avro 格式,由 Kafka JDBC Connect 从数据库创建。我有下面的代码,在执行 spark.read.json 将 json 读取到数据帧之前,它工作正常。

package consumerTest


import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._

import scala.util.parsing.json.{JSON, JSONObject}

object Consumer {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder
      .master("local")
      .appName("my-spark-app")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .getOrCreate();

    import spark.implicits._


    val ssc = new StreamingContext(spark.sparkContext, Seconds(10))

    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "<kafka-server>:9092",
      "key.deserializer" -> classOf[KafkaAvroDeserializer],
      "value.deserializer" -> classOf[KafkaAvroDeserializer],
      "group.id" -> "sakwq",
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> "false",
      "schema.registry.url" -> "http://<schema-registry>:8181"
    )

    val topics = Array("cdcemployee")

    val stream = KafkaUtils.createDirectStream[String, Object](
      ssc,
      PreferConsistent,
      Subscribe[String, Object](topics, kafkaParams)
    )


    val data = stream.map(record => {
      println(record.value.toString())
      record.value
      val df = spark.read.json(record.value.toString())

    })


    data.print();



    ssc.start()
    ssc.awaitTermination()
  }


}

我在执行 val df = spark.read.json(record.value.toString())

行时遇到空指针异常
18/05/10 09:49:11 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
    at consumerTest.Consumer$.$anonfun$main(Consumer.scala:63)
    at consumerTest.Consumer$.$anonfun$main$adapted(Consumer.scala:60)
    at scala.collection.Iterator$$anon.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1354)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/05/10 09:49:11 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:135)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:133)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:689)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:645)
    at consumerTest.Consumer$.$anonfun$main(Consumer.scala:63)
    at consumerTest.Consumer$.$anonfun$main$adapted(Consumer.scala:60)
    at scala.collection.Iterator$$anon.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon.next(Iterator.scala:393)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1354)
    at org.apache.spark.rdd.RDD$$anonfun$take$$anonfun.apply(RDD.scala:1354)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
    at org.apache.spark.SparkContext$$anonfun$runJob.apply(SparkContext.scala:2069)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

此外,如果我删除 spark.read.json 语句

,则执行语句 println(record.value.toString()) 时打印的示例数据
    {"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": "300", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 182241, "CDCTIMESTAMP": "2018-03-26 18:04:44:776 - 04:00", "CDCCHANGESEQ": 14, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": null, "PostalCode": null, "DeptCode": "", "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 197086, "CDCTIMESTAMP": "2018-03-27 11:18:48:022 - 04:00", "CDCCHANGESEQ": 15, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363712, "CDCTIMESTAMP": "2018-04-04 15:30:46:551 - 04:00", "CDCCHANGESEQ": 16, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 57, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "New York", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 363785, "CDCTIMESTAMP": "2018-04-04 15:35:11:492 - 04:00", "CDCCHANGESEQ": 17, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 2, "EmpNum": 57, "LastName": "bobba2s", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "San Diego", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270}
{"CDCTRANSACTIONID": 364688, "CDCTIMESTAMP": "2018-04-04 16:39:05:602 - 04:00", "CDCCHANGESEQ": 18, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 1, "EmpNum": 59, "LastName": "Bobba", "FirstName": "Saikrishna Teja", "Address": "9220 Bothwell St", "Address2": "", "City": "Raleigh", "State": "NC", "PostalCode": "27617", "DeptCode": "300", "Position": "", "HomePhone": "919 931-5737", "WorkPhone": "919 931-5737", "VacationDaysLeft": 10, "SickDaysLeft": 5, "StartDate": 16979, "Birthdate": 7270}
{"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Francisco", "State": "CA", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 384368, "CDCTIMESTAMP": "2018-04-05 15:43:15:478 - 04:00", "CDCCHANGESEQ": 19, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 3, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "Raleigh", "State": "NC", "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}
{"CDCTRANSACTIONID": 650254, "CDCTIMESTAMP": "2018-04-18 16:19:35:669 - 04:00", "CDCCHANGESEQ": 20, "CDCCONTINUATIONPOSITION": 0, "CDCARRAYINDEX": 0, "CDCFRAGMENT": 0, "CDCOPERATION": 4, "EmpNum": 59, "LastName": null, "FirstName": null, "Address": null, "Address2": null, "City": "San Diego", "State": null, "PostalCode": null, "DeptCode": null, "Position": null, "HomePhone": null, "WorkPhone": null, "VacationDaysLeft": null, "SickDaysLeft": null, "StartDate": null, "Birthdate": null}

任何人都可以帮助我如何将其转换为数据框并将其临时存储在 table 中吗?

编辑:

stream 包含每个 interval 时间的 RDD,因此对于每个 interval 时间,您可以将 rdd 转换为 datafarme 作为

stream.foreachRDD(rddRaw => {
  val rdd = rddRaw.map(_.value.toString) // or rddRaw.map(_._2)
  val df = spark.read.json(rdd)
})

这应该会为您提供预期的数据框。

希望对您有所帮助!

Pyspark

Json数据:

{"timestamp": "1571053218000","t1": "55.23","t2": "10","t3": "ON"}

{"timestamp": "1571053278000","t1": "63.23","t2": "11","t3": "OFF"}

{"timestamp": "1571053338000","t1": "73.23","t2": "12","t3": "ON"}

{"timestamp": "1571053398000","t1": "83.23","t2": "13","t3": "ON"}

从上面读取的 Pyspark 代码 json 数据:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.types import IntegerType, LongType, DecimalType,StructType, StructField, StringType
from pyspark.sql import Row
from pyspark.sql.functions import col
import pyspark.sql.functions as F
from pyspark.sql import Window

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)
ssc = StreamingContext(sc, 5)

stream_data = ssc.textFileStream("/filepath/")

def readMyStream(rdd):
  if not rdd.isEmpty():
    df = spark.read.json(rdd)
    print('Started the Process')
    print('Selection of Columns')
    df = df.select('t1','t2','t3','timestamp').where(col("timestamp").isNotNull())
    df.show()

stream_data.foreachRDD( lambda rdd: readMyStream(rdd) )
ssc.start()
ssc.stop()