从 Spark DataFrame 创建一个包含 class 个对象的序列

Create a Sequence of case class objects from a Spark DataFrame

如何遍历 Spark DataFrame 行并将它们添加到案例序列 class 对象中?

DF1:

val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

案例Class:

case class ValuePerNumber(num:String, wrd:String, defaultID:Int, size: Long=0) {}

预期输出:

Seq(ValuePerNumber("202003101750", "202003101700",0, 122), ValuePerNumber("202003101800", "202003101700",0, 12), ValuePerNumber("202003101750", "202003101700",0, 42)) 

在每种情况下,我都可以将 defaultID 设置为 0。 我不确定如何处理和解决这个问题,非常感谢任何解决方案/建议!

我试过以下方法:

val x = someDF.as[ValuePerNumber].collect()

我收到以下错误:

org.apache.spark.sql.AnalysisException: cannot resolve '`num`' given input columns: [number, word, value];

编辑:如果 question/solution 对您有所帮助,请点赞,这反过来也会对我在这个论坛中有所帮助。

您可以将 Dataset[ValuePeerNumber]collect 创建为 Seq

val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

val result = someDF.map(r => ValuePerNumber(r.getAs[String](0), r.getAs[String](1), r.getAs[Int](2))).collect().toSeq

您还可以在数据框中添加列并编辑列名以匹配大小写class,您可以直接这样做

val x = someDF.as[ValuePerNumber].collect()
val someDF = Seq(
  ("202003101750", "202003101700",122),
  ("202003101800", "202003101700",12),
  ("202003101750", "202003101700",42)
).toDF("number", "word","value")

case class ValuePerNumber(number:String, word:String, defaultID:Int, value: Long)

someDF.withColumn("defaultId", lit(0)).as[ValuePerNumber].collect.toSeq

DataFrame 和 Case 中的列数和名称 Class 应该匹配以直接在 DataFrame 上使用 as[ValuePerNumber] 而无需提取值。

  1. size 在 DataFrame 中不可用,因此使用 withColumn
  2. 添加
  3. DF 和 Case 中的列名都不匹配 class。修改以匹配 DF 和 Case Class.
scala> :paste
// Entering paste mode (ctrl-D to finish)

val someDF = Seq(("202003101750", "202003101700",122),("202003101800", "202003101700",12),("202003101750", "202003101700",42))
.toDF("number", "word","value")
.withColumn("size",lit(0)) // Added this to match your case class columns


// Exiting paste mode, now interpreting.

someDF: org.apache.spark.sql.DataFrame = [number: string, word: string ... 2 more fields]

scala> case class ValuePerNumber(number:String, word:String, value:Int, size: Long=0) // Modified column names to match your dataframe column names.
defined class ValuePerNumber

scala> someDF.as[ValuePerNumber].show(false)
+------------+------------+-----+----+
|number      |word        |value|size|
+------------+------------+-----+----+
|202003101750|202003101700|122  |0   |
|202003101800|202003101700|12   |0   |
|202003101750|202003101700|42   |0   |
+------------+------------+-----+----+


scala>