加载 Mongo 集合作为 Spark 数据集时如何在模式中指定 BigDecimal 比例和精度

How to specify BigDecimal scale and precision in schema when loading a Mongo collection as a Spark Dataset

我正在尝试使用 Scala Mongo 连接器将大型 Mongo 集合加载到 Apache Spark 中。

我正在使用以下版本:

libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0" 
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0" 
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14

该集合包含大于 1e13 的大整数十进制值。我想获取的数据集是一个集合,对应的案例class叫Output,定义为:

case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)

如果我使用 MongoSpark.load 而不指定大小写 class:

val ds = MongoSpark.load(sc, rc).toDS[Output]

然后 Mongo 通过随机抽取集合来推断模式。这会导致 value 属性的随机比例,并且任何 value 溢出随机获得的比例的文档在生成的数据集中都会缺少 value 属性。这显然是不希望的。

或者,根据 documentation for the Mongo Spark connector,我可以通过将大小写 class 指定为 load 的类型参数化来显式设置模式,例如:

val ds = MongoSpark.load[Output](sc, rc).toDS[Output]

但是,在 case-class 定义中,我只能将 value 的类型指定为 BigDecimal,这不允许我明确说明所需的比例和精度。生成的模式使用默认精度和比例 (38,18),这并不总是需要的:

root
 |-- time: long (nullable = false)
 |-- pubKeyId: long (nullable = false)
 |-- value: decimal(38,18) (nullable = true)
 |-- outIndex: long (nullable = false)
 |-- outTxId: long (nullable = false)

这与 Spark SQL API 形成对比,后者允许使用 DecimalType 显式指定比例和精度,例如:

val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)

在将 Mongo 集合加载到 Apache Spark 时,如何为架构中的大十进制类型请求特定的比例和精度,类似于上面的代码?

根据 this and this,据我所知,Decimal128 中的尾数和指数是固定大小的。除非您能找到相反的证据,否则 MongoDB 允许为其小数指定比例和精度是没有意义的。

我的理解是关系数据库会根据比例和精度使用不同的浮点类型(例如 32 位与 64 位浮点数),但在 MongoDB 中数据库会保留它给定的类型,所以如果你想要一个较短的浮点数,您需要让您的应用程序发送它而不是十进制类型。

我可以通过绕过 load 辅助方法并直接在 MongoSpark 实例上调用 toDF(schema) 来做到这一点:

 val schema = StructType(
                             List(StructField("time", LongType, false),
                                  StructField("pubKeyId", LongType, false),
                                  StructField("value", DecimalType(30, 0), false),
                                  StructField("outIndex", LongType, false),
                                  StructField("outTxId", LongType, false)
                             ))
    val outputs =    
      builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]

这会产生正确的模式,并且数据会被正确读入 Spark,没有任何缺失值:

    outputs.printSchema()
 |-- time: long (nullable = false)
 |-- pubKeyId: long (nullable = false)
 |-- value: decimal(30,0) (nullable = false)
 |-- outIndex: long (nullable = false)
 |-- outTxId: long (nullable = false)