如何将 GroupedDataset 保存到镶木地板或将其转换为 toDF

How to save GroupedDataset to parquet or convert it to toDF

我正在使用 spark 1.6.1。

是否有 API 可用于将 GroupDataset 保存到 parquet 文件。 或者将其转换为 DataFrame。

例如我有一个自定义对象 'Procedure',我已将 Dataframe 转换为过程对象。 之后,我正在对 patientID 进行分组。 我想将 groupdDs 到镶木地板文件或将其作为 Dataframe 传递给其他函数。 我没有得到任何 API 用于存储或将其转换为 Dataframe。

val procedureDs: Dataset[Procedure] = joinDf.select("patientid", "patientprocedureid", "procedurecode").as[Procedure]
val groupedDs:GroupedDataset[Long, Procedure] = procedureDs.groupBy{ x => x.patientid } 

应用 mapGroups 后

val a = groupedDs.mapGroups{ case (k,vs) => { (k, vs.toSeq)}}

它给出以下错误:

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.....PatientDiagnosis
- array element class: "com....PatientDiagnosis"
- field (class: "scala.collection.Seq", name: "_2")
- root class: "scala.Tuple2"

我曾尝试给出明确的编码器

val a = groupedDigDs.mapGroups((k,vs) =>  (k, vs.toSeq))(org.apache.spark.sql.Encoders.bean(classOf[(Long, Seq[com....PatientDiagnosis])]))

然后错误更改为:

 java.lang.UnsupportedOperationException: Cannot infer type for class scala.Tuple2 because it is not bean-compliant

GroupedData相同(RelationalGroupedDataset in Spark 2.x),GroupedDatasetKeyValueGroupedDataset in Spark 2.x)必须聚合才能保存。

如果你的目标是另一个 groupByKey 你可以使用 mapGroups:

val groupedDs: GroupedDataset[K, V] = ???
// ... { case (k, xs) => (k, xs.toSeq) }  to preserve key as well
groupedDs.mapGroups { case (_, xs) => xs.toSeq }

并写下结果。