如何从 SocketTCP 获取数据以保存到 Spark Scala 中的数据框?
How to get data from SocketTCP to save to dataframe in Spark Scala?
I try to get data from Socket TCP to append to dataframe I recived
data and executed them to Seq() but when I using forEach to append
them to dataframe has an problem This is my Code :
object CustomReceiver {
def main(args: Array[String]): Unit = {
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("CustomReceiver")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
import spark.implicits._
/*formatdata line data from Socket: number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */
val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
linesData1.flatMap(_.split(" ").map(_.trim))
linesData1.foreachRDD { rdd =>
rdd.foreach{ line => {
val arrraLine = line.split(",").toList
// oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
// oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))
val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
// has an Problem
testDF.show()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
这是我的问题 运行
java.lang.NullPointerException
at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
at Cl.CustomReceiver$.$anonfun$main(CustomeReceiver.scala:52)
at Cl.CustomReceiver$.$anonfun$main$adapted(CustomeReceiver.scala:45)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:25)
at org.apache.spark.rdd.RDD.$anonfun$foreach(RDD.scala:1012)
at org.apache.spark.rdd.RDD.$anonfun$foreach$adapted(RDD.scala:1012)
at org.apache.spark.SparkContext.$anonfun$runJob(SparkContext.scala:2236)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
您尝试做的事情不是很值得推荐。如果您想使用 Dataframes,请使用 Spark Structured Streaming。您正在尝试在 foreach RDD 操作中创建一个 DF。如果您使用旧的 Spark Streaming 版本,请使用 RDD。
您可以在“foreachRDD”中创建一个 DF,为每个小批量创建一个 RDD,但这不是一个好主意。如果你进行测试,你会看到很多火花阶段来创建新的 DF,并且对于每个小批量......使用结构化流,你可以直接创建数据帧。
I try to get data from Socket TCP to append to dataframe I recived data and executed them to Seq() but when I using forEach to append them to dataframe has an problem This is my Code :
object CustomReceiver {
def main(args: Array[String]): Unit = {
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val spark: SparkSession = SparkSession.builder()
.master("local[2]")
.appName("CustomReceiver")
.getOrCreate()
val sc = spark.sparkContext
val ssc = new StreamingContext(sc, Seconds(5))
import spark.implicits._
/*formatdata line data from Socket: number1, 20210621090303, RadiusMessage, Stop, 84602496347, v241.66.85.130 */
val linesData1 = ssc.receiverStream(new CustomReceiver("localhost", 11000))
linesData1.flatMap(_.split(" ").map(_.trim))
linesData1.foreachRDD { rdd =>
rdd.foreach{ line => {
val arrraLine = line.split(",").toList
// oke arrayLine data : List(number1, 20210621090303, RadiusMessage, Stop, 84602496347, 241.66.85.130)
val testRDD = Seq(arrraLine).map(x =>(x(0), x(1), x(2), x(3), x(4)))
// oke TestRDD : testRDD :List((number1,20210621090303,RadiusMessage,Stop,84602496347))
val testDF = testRDD.toDF("cot1","cot2","cot3","cot4","cot5")
// has an Problem
testDF.show()
}
}
}
ssc.start()
ssc.awaitTermination()
}
}
这是我的问题 运行
java.lang.NullPointerException at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231) at Cl.CustomReceiver$.$anonfun$main(CustomeReceiver.scala:52) at Cl.CustomReceiver$.$anonfun$main$adapted(CustomeReceiver.scala:45) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at org.apache.spark.util.CompletionIterator.foreach(CompletionIterator.scala:25) at org.apache.spark.rdd.RDD.$anonfun$foreach(RDD.scala:1012) at org.apache.spark.rdd.RDD.$anonfun$foreach$adapted(RDD.scala:1012) at org.apache.spark.SparkContext.$anonfun$runJob(SparkContext.scala:2236) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
您尝试做的事情不是很值得推荐。如果您想使用 Dataframes,请使用 Spark Structured Streaming。您正在尝试在 foreach RDD 操作中创建一个 DF。如果您使用旧的 Spark Streaming 版本,请使用 RDD。
您可以在“foreachRDD”中创建一个 DF,为每个小批量创建一个 RDD,但这不是一个好主意。如果你进行测试,你会看到很多火花阶段来创建新的 DF,并且对于每个小批量......使用结构化流,你可以直接创建数据帧。