通过读取具有不同数据类型的 Scala 序列来创建 Spark 数据帧
Create a Spark dataframe by reading a Scala sequence having different datatypes
我想通过使用 Scala 读取 Seq 来创建 Spark 数据框。
seq的数据类型有String、Dataframe、Long和Date类型。
我尝试应用以下方法但出现了一些错误,可能这不是处理问题的正确方法。
val Total_Record_Count = TotalRecordDF.count // geting count total number by reading a dataframe
val Rejected_Record_Count = rejectDF.count // geting count total number by reading a dataframe
val Batch_Run_ID = spark.range(1).select(unix_timestamp as "current_timestamp")
case class JobRunDetails(Job_Name: String, Batch_Run_ID: DataFrame, Source_Entity_Name: String, Total_Record_Count: Long, Rejected_Record_Count: Long, Reject_Record_File_Path: String,Load_Date: String)
val inputSeq = Seq(JobRunDetails("HIT", Batch_Run_ID, "HIT", Total_Record_Count, Rejected_Record_Count, "blob.core.windows.net/feedlayer", Load_Date))
我试过了
val df = sc.parallelize(inputSeq).toDF()
但它抛出错误 "java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.DataFrame"
我只想通过读取序列来创建数据框。
任何帮助将不胜感激。
注意:- 我使用的是 Databricks Spark 2.3 版本。
通常我们使用 Java/Scala 基本类型创建 case classes.. 还没有看到有人创建 case class with DataFrame 作为成员元素之一..
如果我的要求正确..这就是你要找的 -
case class JobRunDetails(Job_Name: String, Batch_Run_ID: Int, Source_Entity_Name: String, Total_Record_Count: Long, Rejected_Record_Count: Long, Reject_Record_File_Path: String, Load_Date: String)
//defined class JobRunDetails
import spark.implicits._
val Total_Record_Count = 100 //TotalRecordDF.count // geting count total number by reading a dataframe
val Rejected_Record_Count = 200 //rejectDF.count // geting count total number by reading a dataframe
val Batch_Run_ID = spark.range(1).select(unix_timestamp as "current_timestamp").take(1).head.get(0).toString().toInt
val Load_Date = "2019-27-07"
val inputRDD: RDD[JobRunDetails] = spark.sparkContext.parallelize(Seq(JobRunDetails("HIT", Batch_Run_ID, "HIT", Total_Record_Count, Rejected_Record_Count, "blob.core.windows.net/feedlayer", Load_Date)))
inputRDD.toDF().show
/**
import spark.implicits._
Total_Record_Count: Int = 100
Rejected_Record_Count: Int = 200
Batch_Run_ID: Int = 1564224156
Load_Date: String = 2019-27-07
inputRDD: org.apache.spark.rdd.RDD[JobRunDetails] = ParallelCollectionRDD[3] at parallelize at command-330223868839989:6
*/
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
|Job_Name|Batch_Run_ID|Source_Entity_Name|Total_Record_Count|Rejected_Record_Count|Reject_Record_File_Path| Load_Date|
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
| HIT| 1564224156| HIT| 100| 200| blob.core.windows...|2019-27-07|
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
我想通过使用 Scala 读取 Seq 来创建 Spark 数据框。 seq的数据类型有String、Dataframe、Long和Date类型。
我尝试应用以下方法但出现了一些错误,可能这不是处理问题的正确方法。
val Total_Record_Count = TotalRecordDF.count // geting count total number by reading a dataframe
val Rejected_Record_Count = rejectDF.count // geting count total number by reading a dataframe
val Batch_Run_ID = spark.range(1).select(unix_timestamp as "current_timestamp")
case class JobRunDetails(Job_Name: String, Batch_Run_ID: DataFrame, Source_Entity_Name: String, Total_Record_Count: Long, Rejected_Record_Count: Long, Reject_Record_File_Path: String,Load_Date: String)
val inputSeq = Seq(JobRunDetails("HIT", Batch_Run_ID, "HIT", Total_Record_Count, Rejected_Record_Count, "blob.core.windows.net/feedlayer", Load_Date))
我试过了
val df = sc.parallelize(inputSeq).toDF()
但它抛出错误 "java.lang.UnsupportedOperationException: No Encoder found for org.apache.spark.sql.DataFrame"
我只想通过读取序列来创建数据框。 任何帮助将不胜感激。 注意:- 我使用的是 Databricks Spark 2.3 版本。
通常我们使用 Java/Scala 基本类型创建 case classes.. 还没有看到有人创建 case class with DataFrame 作为成员元素之一..
如果我的要求正确..这就是你要找的 -
case class JobRunDetails(Job_Name: String, Batch_Run_ID: Int, Source_Entity_Name: String, Total_Record_Count: Long, Rejected_Record_Count: Long, Reject_Record_File_Path: String, Load_Date: String)
//defined class JobRunDetails
import spark.implicits._
val Total_Record_Count = 100 //TotalRecordDF.count // geting count total number by reading a dataframe
val Rejected_Record_Count = 200 //rejectDF.count // geting count total number by reading a dataframe
val Batch_Run_ID = spark.range(1).select(unix_timestamp as "current_timestamp").take(1).head.get(0).toString().toInt
val Load_Date = "2019-27-07"
val inputRDD: RDD[JobRunDetails] = spark.sparkContext.parallelize(Seq(JobRunDetails("HIT", Batch_Run_ID, "HIT", Total_Record_Count, Rejected_Record_Count, "blob.core.windows.net/feedlayer", Load_Date)))
inputRDD.toDF().show
/**
import spark.implicits._
Total_Record_Count: Int = 100
Rejected_Record_Count: Int = 200
Batch_Run_ID: Int = 1564224156
Load_Date: String = 2019-27-07
inputRDD: org.apache.spark.rdd.RDD[JobRunDetails] = ParallelCollectionRDD[3] at parallelize at command-330223868839989:6
*/
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
|Job_Name|Batch_Run_ID|Source_Entity_Name|Total_Record_Count|Rejected_Record_Count|Reject_Record_File_Path| Load_Date|
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+
| HIT| 1564224156| HIT| 100| 200| blob.core.windows...|2019-27-07|
+--------+------------+------------------+------------------+---------------------+-----------------------+----------+