加载 Mongo 集合作为 Spark 数据集时如何在模式中指定 BigDecimal 比例和精度
How to specify BigDecimal scale and precision in schema when loading a Mongo collection as a Spark Dataset
我正在尝试使用 Scala Mongo 连接器将大型 Mongo 集合加载到 Apache Spark 中。
我正在使用以下版本:
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14
该集合包含大于 1e13
的大整数十进制值。我想获取的数据集是一个集合,对应的案例class叫Output
,定义为:
case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)
如果我使用 MongoSpark.load 而不指定大小写 class:
val ds = MongoSpark.load(sc, rc).toDS[Output]
然后 Mongo 通过随机抽取集合来推断模式。这会导致 value
属性的随机比例,并且任何 value
溢出随机获得的比例的文档在生成的数据集中都会缺少 value
属性。这显然是不希望的。
或者,根据 documentation for the Mongo Spark connector,我可以通过将大小写 class 指定为 load
的类型参数化来显式设置模式,例如:
val ds = MongoSpark.load[Output](sc, rc).toDS[Output]
但是,在 case-class 定义中,我只能将 value
的类型指定为 BigDecimal
,这不允许我明确说明所需的比例和精度。生成的模式使用默认精度和比例 (38,18),这并不总是需要的:
root
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(38,18) (nullable = true)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)
这与 Spark SQL API 形成对比,后者允许使用 DecimalType
显式指定比例和精度,例如:
val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)
在将 Mongo 集合加载到 Apache Spark 时,如何为架构中的大十进制类型请求特定的比例和精度,类似于上面的代码?
根据 this and this,据我所知,Decimal128 中的尾数和指数是固定大小的。除非您能找到相反的证据,否则 MongoDB 允许为其小数指定比例和精度是没有意义的。
我的理解是关系数据库会根据比例和精度使用不同的浮点类型(例如 32 位与 64 位浮点数),但在 MongoDB 中数据库会保留它给定的类型,所以如果你想要一个较短的浮点数,您需要让您的应用程序发送它而不是十进制类型。
我可以通过绕过 load
辅助方法并直接在 MongoSpark
实例上调用 toDF(schema)
来做到这一点:
val schema = StructType(
List(StructField("time", LongType, false),
StructField("pubKeyId", LongType, false),
StructField("value", DecimalType(30, 0), false),
StructField("outIndex", LongType, false),
StructField("outTxId", LongType, false)
))
val outputs =
builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]
这会产生正确的模式,并且数据会被正确读入 Spark,没有任何缺失值:
outputs.printSchema()
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(30,0) (nullable = false)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)
我正在尝试使用 Scala Mongo 连接器将大型 Mongo 集合加载到 Apache Spark 中。
我正在使用以下版本:
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.0.0"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.0.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.2"
scalaVersion := "2.12.12"
openjdk version "11.0.8" 2020-07-14
该集合包含大于 1e13
的大整数十进制值。我想获取的数据集是一个集合,对应的案例class叫Output
,定义为:
case class Output(time: Long, pubKeyId: Long, value: BigDecimal, outIndex: Long, outTxId: Long)
如果我使用 MongoSpark.load 而不指定大小写 class:
val ds = MongoSpark.load(sc, rc).toDS[Output]
然后 Mongo 通过随机抽取集合来推断模式。这会导致 value
属性的随机比例,并且任何 value
溢出随机获得的比例的文档在生成的数据集中都会缺少 value
属性。这显然是不希望的。
或者,根据 documentation for the Mongo Spark connector,我可以通过将大小写 class 指定为 load
的类型参数化来显式设置模式,例如:
val ds = MongoSpark.load[Output](sc, rc).toDS[Output]
但是,在 case-class 定义中,我只能将 value
的类型指定为 BigDecimal
,这不允许我明确说明所需的比例和精度。生成的模式使用默认精度和比例 (38,18),这并不总是需要的:
root
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(38,18) (nullable = true)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)
这与 Spark SQL API 形成对比,后者允许使用 DecimalType
显式指定比例和精度,例如:
val mySchema = StructType(StructField("value", DecimalType(30, 0)) :: Nil)
在将 Mongo 集合加载到 Apache Spark 时,如何为架构中的大十进制类型请求特定的比例和精度,类似于上面的代码?
根据 this and this,据我所知,Decimal128 中的尾数和指数是固定大小的。除非您能找到相反的证据,否则 MongoDB 允许为其小数指定比例和精度是没有意义的。
我的理解是关系数据库会根据比例和精度使用不同的浮点类型(例如 32 位与 64 位浮点数),但在 MongoDB 中数据库会保留它给定的类型,所以如果你想要一个较短的浮点数,您需要让您的应用程序发送它而不是十进制类型。
我可以通过绕过 load
辅助方法并直接在 MongoSpark
实例上调用 toDF(schema)
来做到这一点:
val schema = StructType(
List(StructField("time", LongType, false),
StructField("pubKeyId", LongType, false),
StructField("value", DecimalType(30, 0), false),
StructField("outIndex", LongType, false),
StructField("outTxId", LongType, false)
))
val outputs =
builder().sparkContext(sc).readConfig(rc).build().toDF(schema).as[Output]
这会产生正确的模式,并且数据会被正确读入 Spark,没有任何缺失值:
outputs.printSchema()
|-- time: long (nullable = false)
|-- pubKeyId: long (nullable = false)
|-- value: decimal(30,0) (nullable = false)
|-- outIndex: long (nullable = false)
|-- outTxId: long (nullable = false)