如何最好地处理将 MongoRDD 转换为 DataFrame 的模式冲突?

How best to handle schema conflicts converting MongoRDD to DataFrame?

我正在尝试从 mongo 数据库中读取一些文档并解析 spark DataFrame 中的模式。到目前为止,我已经成功地从 mongo 中读取数据并将生成的 mongoRDD 转换为使用案例 类 定义的模式的 DataFrame,但是有一个场景 mongo 集合有一个包含多种数据类型的字段(字符串数组与嵌套对象数组)。到目前为止,我只是将字段解析为字符串,然后使用 spark sql 的 from_json() 来解析新模式中的嵌套对象,但我发现当字段不符合模式,它 return 对模式中的所有字段都是 null - 而不仅仅是不符合的字段。有没有一种方法可以解析它,以便只有与架构不匹配的字段才会 return 为空?

//creating mongo test data in mongo shell
db.createCollection("testColl")
db.testColl.insertMany([
    { "foo" : ["fooString1", "fooString2"], "bar" : "barString"},
    { "foo" : [{"uid" : "fooString1"}, {"uid" : "fooString2"}], "bar" : "barString"}
])


import com.mongodb.spark.config.ReadConfig
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.functions._
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql.types.{StringType, StructField, StructType}

//mongo connector and read config
val testConfig = ReadConfig(Map("uri" -> "mongodb://some.mongo.db",
    "database" -> "testDB",
    "collection" -> "testColl"
  ))



//Option 1: 'lowest common denominator' case class - works, but leaves the nested struct type value as json that then needs additional parsing

case class stringArray (foo: Option[Seq[String]], bar: Option[String])
val df1 : DataFrame = MongoSpark.load(spark.sparkContext, testConfig).toDF[stringArray]
df1.show()
+--------------------+---------+
|                 foo|      bar|
+--------------------+---------+
|[fooString1, fooS...|barString|
|[{ "uid" : "fooSt...|barString|
+--------------------+---------+


//Option 2: accurate case class - fails with:
//com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a StructType(StructField(uid,StringType,true)) (value: BsonString{value='fooString1'})

case class fooDoc (uid: Option[String])
case class docArray (foo: Option[Seq[fooDoc]], bar: Option[String])
val df2 : DataFrame = MongoSpark.load(spark.sparkContext, testConfig).toDF[docArray]


//Option 3: map all rows to json string, then use from_json - why does return null for 'bar' in the case of the schema that doesn't fit?

val mrdd = MongoSpark.load(spark.sparkContext, testConfig)
val jsonRDD = mrdd.map(x => Row(x.toJson()))
val simpleSchema = StructType(Seq(StructField("wholeRecordJson", StringType, true)))
val schema = ScalaReflection.schemaFor[docArray].dataType.asInstanceOf[StructType]
val jsonDF = spark.createDataFrame(jsonRDD, simpleSchema)
val df3 = jsonDF.withColumn("parsed",from_json($"wholeRecordJson", schema))
df3.select("parsed.foo", "parsed.bar").show()
+--------------------+---------+
|                 foo|      bar|
+--------------------+---------+
|                null|     null|
|[[fooString1], [f...|barString|
+--------------------+---------+


//Desired results:
//desired outcome is for only the field not matching the schema (string type of 'foo') is null, but matching columns are populated

+--------------------+---------+
|                 foo|      bar|
+--------------------+---------+
|                null|barString|
|[[fooString1], [f...|barString|
+--------------------+---------+

不,没有简单的方法可以做到这一点,因为在同一文档集合中合并不兼容的模式是一种反模式,即使在 Mongo 中也是如此。

处理这个问题的主要方法有以下三种:

  1. 修复Mongo数据库中的数据。

  2. 发出 "normalizes" Mongo 模式的查询,例如,删除具有不兼容类型的字段或转换它们或重命名它们等

  3. 针对特定模式类型的文档向 Mongo 发出单独的查询。 (Mongo 具有可以根据字段类型进行过滤的查询运算符。)然后 post 在 Spark 中进行处理,最后将数据联合到单个 Spark 数据集中。