从另一个数据集中查找 select、spark.read() 的列 - Spark Scala

Find columns to select, for spark.read(), from another Dataset - Spark Scala

我有一个具有以下架构的 Dataset[Year]

case class Year(day: Int, month: Int, Year: Int)

有没有办法收集当前模式?

我试过:

println("Print  -> "+ds.collect().toList)

但结果是: Print -> List([01,01,2022], [31,01,2022])

我期望是这样的: Print -> List(Year(01,01,2022), Year(31,01,2022)

我知道我可以使用地图对其进行调整,但我正在尝试创建一个接受任何模式的通用方法,为此我无法添加地图进行转换。

这是我的方法:

class SchemeList[A]{

  def set[A](ds: Dataset[A]): List[A] = {
    ds.collect().toList
  }

}

显然方法 return 获得了正确的签名,但是当 运行 引擎出现错误时:

val setYears = new SchemeList[Year]
val YearList: List[Year] = setYears.set(df)
Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to schemas.Schemas$Year

可能你得到的是 DataFrame,而不是 DataSet。 尝试使用“as”将数据框转换为数据集。 像这样

val year = Year(1,1,1)
val years = Array(year,year).toList
import spark.implicits._
val df = spark.
  sparkContext
  .parallelize(years)
  .toDF("day","month","Year")
  .as[Year]
println(df.collect().toList)

根据您评论中的附加信息:

I need this list to use as variables when creating another dataframe via jdbc (I need to make a specific select within postgresql). Is there a more performative way to pass values from a dataframe as parameters in a select?

鉴于您的初始数据集:

val yearsDS: Dataset[Year] = ???

并且您想执行以下操作:

val desiredColumns: Array[String] = ???

spark.read.jdbc(..).select(desiredColumns.head, desiredColumns.tail: _*)

您可以通过以下操作找到 yearsDS 的列名:

val desiredColumns: Array[String] = yearsDS.columns

Spark 通过使用在 Dataset 上定义的 def schema 来实现这一点。 你可以看到 definition of def columns here.