如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据框?

How to get data from SocketTCP to save to dataframe in Spark Scala?

I try to get data from Socket TCP to append to dataframe I recived data and executed them to Seq() but when I using forEach to append them to dataframe has an problem This is my Code :

object CustomReceiver {
  def main(args: Array[String]): Unit = {
    StreamingExamples.setStreamingLogLevels()

    // Create the context with a 1 second batch size
    val spark: SparkSession = SparkSession.builder()
      .master("local[2]")
      .appName("CustomReceiver")
      .getOrCreate()

    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(5))
    import spark.implicits._

   /*formatdata line data from Socket:  number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */

    val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
    linesData1.flatMap(_.split(" ").map(_.trim))
    linesData1.foreachRDD { rdd =>
      rdd.foreach{ line => {

        val  arrraLine = line.split(",").toList
       // oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
        val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
        // oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))

          val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
         // has an Problem
          testDF.show()
      }
      }
    }
       ssc.start()
        ssc.awaitTermination()
      }
    }

这是我的问题 运行

java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231) at Cl.CustomReceiver$.$anonfun$main(CustomeReceiver.scala:52) at Cl.CustomReceiver$.$anonfun$main$adapted(CustomeReceiver.scala:45) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:25) at org.apache.spark.rdd.RDD.$anonfun$foreach(RDD.scala:1012) at org.apache.spark.rdd.RDD.$anonfun$foreach$adapted(RDD.scala:1012) at org.apache.spark.SparkContext.$anonfun$runJob(SparkContext.scala:2236) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)

您尝试做的事情不是很值得推荐。如果您想使用 Dataframes,请使用 Spark Structured Streaming。您正在尝试在 foreach RDD 操作中创建一个 DF。如果您使用旧的 Spark Streaming 版本,请使用 RDD。

您可以在“foreachRDD”中创建一个 DF,为每个小批量创建一个 RDD,但这不是一个好主意。如果你进行测试,你会看到很多火花阶段来创建新的 DF,并且对于每个小批量......使用结构化流,你可以直接创建数据帧。