Mongoexport 严格 json 在 Spark 中加载

Mongoexport strict json load in Spark

我有一个使用 mongoexport 从 mongodb 导出数据的过程。 正如 documentation 提到的,所有 json 输出都处于 严格 模式

这意味着数据将如下所示:

"{amount":{"$numberLong":"3"},"count":{"$numberLong":"245"}}

我的 Scala 案例 class 定义为:

case class MongoData(amount: Long, count: Long)

读取数据当然会这样失败:

spark
      .read
      .json(inputPath)
      .as[MongoData]

有没有办法在不使用严格模式的情况下从 mongo 导出,或者在 Scala 中导入 json 而无需手动将每个字段重组为适当的结构?

我现在使用这个作为解决方案。但感觉有点hacky。

case class DataFrameExtended(dataFrame: DataFrame) {

   def undoMongoStrict(): DataFrame = {
    val numberLongType = StructType(List(StructField("$numberLong", StringType, true))) 

    def restructure(fields: Array[StructField], nesting: List[String] = Nil): List[Column] = {
      fields.flatMap(field => {
        val fieldPath = nesting :+ field.name
        val fieldPathStr = fieldPath.mkString(".")
        field.dataType match {
          case dt: StructType if dt == numberLongType =>
            Some(col(s"$fieldPathStr.$$numberLong").cast(LongType).as(field.name))
          case dt: StructType =>
            Some(struct(restructure(dt.fields, fieldPath): _*).as(field.name))
          case _ => Some(col(fieldPathStr).as(field.name))
          //              case dt:ArrayType => //@todo handle other DataTypes Array??
        }
      })
    }.toList


    dataFrame.select(restructure(dataFrame.schema.fields): _*)
  }
}

implicit def dataFrameExtended(df: DataFrame): DataFrameExtended = {
  DataFrameExtended(df)
}

spark
  .read
  .json(inputPath)
  .undoMongoStrict()