DataFrame 到 Json 数组

DataFrame to Array of Jsons

我有一个数据框如下

+-------------+-------------+-------------+
| columnName1 | columnName2 | columnName3 |
+-------------+-------------+-------------+
| 001         | 002         | 003         |
+-------------+-------------+-------------+
| 004         | 005         | 006         |
+-------------+-------------+-------------+

我想按照预期的格式转换为 JSON。

预期格式

[[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}],[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]]

提前致谢

我已经用 playjson api 的

试过了
val ColumnsNames: Seq[String] = DF.columns.toSeq
    val result= DF
      .limit(recordLimit)
      .map { row =>
        val kv: Map[String, String] = row.getValuesMap[String](allColumns)
        kv.map { x =>
          Json
            .toJson(
              List(
                ("key"   -> x._1),
                ("value" -> x._2)
              ).toMap
            )
            .toString()
        }.mkString("[", ", ", "]")
      }
      .take(10)

现在以这种格式出现:

["[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}]","[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]"]

但我需要这种带有编码器的 playjson 的预期格式

[[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}],[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]]

面临这个问题

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]       .map { row =>

基本上将Array[String]转换成Array[Array[Jsvalue]]

上面的异常抛出是因为Spark没有Jsvalue到deserialize/serializeDF的编码器。检查它的 Spark 自定义编码器。但是,可以在 DF 映射操作中返回 JSValue JS.toString 而不是返回。

一种方法可以是:

  1. 将 DF 行转换为兼容 JSON - [{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}]
  2. 收集 DF 作为 array/list mkstring using , delimiter
  3. 在“[]”中包含上面的字符串

警告下面的代码使用 Collect,它可能会阻塞 Spark 驱动程序

//CSV
c1,c2,c3
001,002,003
004,005,006

//Code
val df = spark.read.option("header", "true").csv("array_json.csv")
val allColumns = df.schema.map(s => s.name)
//Import Spark implicits Encoder
import spark.implicits._ 
val sdf = df.map(row => {
  val kv = row.getValuesMap[String](allColumns)
  Json.toJson(kv.map(x => {
    List(
      "key" -> x._1,
      "value" -> x._2
    ).toMap
  })).toString()
})

val dfString = sdf.collect().mkString(",")
val op = s"[$dfString]"
println(op)

输出:

[[{"key":"c1","value":"001"},{"key":"c2","value":"002"},{"key":"c3","value":"003"}],[{"key":"c1","value":"004"},{"key":"c2","value":"005"},{"key":"c3","value":"006"}]]

另一种没有RDD的方法:

  import spark.implicits._
  val df = List((1, 2, 3), (11, 12, 13), (21, 22, 23)).toDF("A", "B", "C")

  val fKeyValue = (name: String) => 
                   struct(lit(name).as("key"), col(name).as("value"))

  val lstCol = df.columns.foldLeft(List[Column]())((a, b) => fKeyValue(b) :: a)

  val dsJson =
     df
        .select(collect_list(array(lstCol: _*)).as("obj"))
        .toJSON

  import play.api.libs.json._
  val json: JsValue = Json.parse(dsJson.first())
  val arrJson = json \ "obj"
  println(arrJson)
    val ColumnsNames: Seq[String] = DF.columns.toSeq
        val result= Json.parse(DF
          .limit(recordLimit)
          .map { row =>
            val kv: Map[String, String] = row.getValuesMap[String](allColumns)
            kv.map { x =>
              Json
                .toJson(
                  List(
                    ("key"   -> x._1),
                    ("value" -> x._2)
                  ).toMap
                )
                .toString()
            }.mkString("[", ", ", "]")
          }
          .take(10).mkstring("[", ", ", "]"))

给予


    [[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}],[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]]