为什么 BigDecimal 的 Spark groupBy.agg(min/max) 总是 return 0?
Why does Spark groupBy.agg(min/max) of BigDecimal always return 0?
我正在尝试按 DataFrame 的一列进行分组,并在每个结果组中生成 BigDecimal 列的 min
和 max
值。结果总是产生一个非常小(大约为 0)的值。
(针对 Double 列的类似 min/max
调用会产生预期的非零值。)
举个简单的例子:
如果我创建以下 DataFrame:
import org.apache.spark.sql.{functions => f}
case class Foo(group: String, bd_value: BigDecimal, d_value: Double)
val rdd = spark.sparkContext.parallelize(Seq(
Foo("A", BigDecimal("1.0"), 1.0),
Foo("B", BigDecimal("10.0"), 10.0),
Foo("B", BigDecimal("1.0"), 1.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0)
))
val df = rdd.toDF()
选择 Double 或 BigDecimal 列的 max
return 预期结果:
df.select(f.max("d_value")).show()
// +------------+
// |max(d_value)|
// +------------+
// | 10.0|
// +------------+
df.select(f.max("bd_value")).show()
// +--------------------+
// | max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+
但是如果我分组然后聚合,我会得到 Double 列的合理结果,但 BigDecimal 列的值接近零:
df.groupBy("group").agg(f.max("d_value")).show()
// +-----+------------+
// |group|max(d_value)|
// +-----+------------+
// | B| 10.0|
// | C| 10.0|
// | A| 1.0|
// +-----+------------+
df.groupBy("group").agg(f.max("bd_value")).show()
// +-----+-------------+
// |group|max(bd_value)|
// +-----+-------------+
// | B| 1.00E-16|
// | C| 1.00E-16|
// | A| 1.0E-17|
// +-----+-------------+
为什么 spark return 这些 min/max
调用的结果为零?
TL;DR
Spark 如何处理问题中显示的特定案例中体现的 BigDecimals
的规模似乎存在不一致。代码的行为就像是使用 BigDecimal
对象的比例将 BigDecimal
s 转换为未缩放的 Long
s,然后使用模式的比例转换回 BigDecimal
.
这可以通过以下方式解决
- 使用
setScale
或 明确设置所有BigDecimal
值的比例以匹配DataFrame的架构
- 手动指定模式并从 RDD[行] 创建 DF
长版
这是我认为在我的机器上使用 Spark 2.4.0 时发生的情况。
在 groupBy.max
的情况下,Spark 正在经历 UnsafeRow and converting the BigDecimal
to an unscaled Long
and storing it as a Byte array in setDecimal
at this line (as verified with print statements). Then, when it later calls getDecimal,它使用架构中指定的 比例将字节数组转换回 BigDecimal
=56=]。
如果原始值中的比例与架构中的比例不匹配,则会导致值不正确。例如,
val foo = BigDecimal(123456)
foo.scale
0
val bytes = foo.underlying().unscaledValue().toByteArray()
// convert the bytes into BigDecimal using the original scale -- correct value
val sameValue = BigDecimal(new java.math.BigInteger(bytes), 0)
sameValue: scala.math.BigDecimal = 123456
// convert the bytes into BigDecimal using scale 18 -- wrong value
val smaller = BigDecimal(new java.math.BigInteger(bytes), 18)
smaller: scala.math.BigDecimal = 1.23456E-13
如果我只是 select bd_value
列的最大值,Spark 似乎不会通过 setDecimal
。我还没有验证为什么,或者它去了哪里。
但是,这可以解释问题中观察到的值。使用相同的大小写 class Foo
:
// This BigDecimal has scale 0
val rdd = spark.sparkContext.parallelize(Seq(Foo("C", BigDecimal(123456), 123456.0)))
// And shows with scale 0 in the DF
rdd.toDF.show
+-----+--------+--------+
|group|bd_value| d_value|
+-----+--------+--------+
| C| 123456|123456.0|
+-----+--------+--------+
// But the schema has scale 18
rdd.toDF.printSchema
root
|-- group: string (nullable = true)
|-- bd_value: decimal(38,18) (nullable = true)
|-- d_value: double (nullable = false)
// groupBy + max corrupts in the same way as converting to bytes via unscaled, then to BigDecimal with scale 18
rdd.groupBy("group").max("bd_value").show
+-----+-------------+
|group|max(bd_value)|
+-----+-------------+
| C| 1.23456E-13|
+-----+-------------+
// This BigDecimal is forced to have the same scale as the inferred schema
val rdd = spark.sparkContext.parallelize(Seq(Foo("C",BigDecimal(123456).setScale(18), 123456.0)))
// verified the scale is 18 in the DF
+-----+--------------------+--------+
|group| bd_value| d_value|
+-----+--------------------+--------+
| C|123456.0000000000...|123456.0|
+-----+--------------------+--------+
// And it works as expected
rdd1.groupBy("group").max("bd_value").show
+-----+--------------------+
|group| max(bd_value)|
+-----+--------------------+
| C|123456.0000000000...|
+-----+--------------------+
这也可以解释为什么,正如评论中所观察到的,当从具有显式模式的 RDD[Row] 转换时,它工作正常。
val rdd2 = spark.sparkContext.parallelize(Seq(Row("C", BigDecimal(123456), 123456.0)))
// schema has BigDecimal scale 18
val schema = StructType(Seq(StructField("group", StringType, true), StructField("bd_value", DecimalType(38,18), true), StructField("d_value",DoubleType,false)))
// createDataFrame interprets the value into the schema's scale
val df = spark.createDataFrame(rdd2, schema)
df.show
+-----+--------------------+--------+
|group| bd_value| d_value|
+-----+--------------------+--------+
| C|123456.0000000000...|123456.0|
+-----+--------------------+--------+
我正在尝试按 DataFrame 的一列进行分组,并在每个结果组中生成 BigDecimal 列的 min
和 max
值。结果总是产生一个非常小(大约为 0)的值。
(针对 Double 列的类似 min/max
调用会产生预期的非零值。)
举个简单的例子:
如果我创建以下 DataFrame:
import org.apache.spark.sql.{functions => f}
case class Foo(group: String, bd_value: BigDecimal, d_value: Double)
val rdd = spark.sparkContext.parallelize(Seq(
Foo("A", BigDecimal("1.0"), 1.0),
Foo("B", BigDecimal("10.0"), 10.0),
Foo("B", BigDecimal("1.0"), 1.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0),
Foo("C", BigDecimal("10.0"), 10.0)
))
val df = rdd.toDF()
选择 Double 或 BigDecimal 列的 max
return 预期结果:
df.select(f.max("d_value")).show()
// +------------+
// |max(d_value)|
// +------------+
// | 10.0|
// +------------+
df.select(f.max("bd_value")).show()
// +--------------------+
// | max(bd_value)|
// +--------------------+
// |10.00000000000000...|
// +--------------------+
但是如果我分组然后聚合,我会得到 Double 列的合理结果,但 BigDecimal 列的值接近零:
df.groupBy("group").agg(f.max("d_value")).show()
// +-----+------------+
// |group|max(d_value)|
// +-----+------------+
// | B| 10.0|
// | C| 10.0|
// | A| 1.0|
// +-----+------------+
df.groupBy("group").agg(f.max("bd_value")).show()
// +-----+-------------+
// |group|max(bd_value)|
// +-----+-------------+
// | B| 1.00E-16|
// | C| 1.00E-16|
// | A| 1.0E-17|
// +-----+-------------+
为什么 spark return 这些 min/max
调用的结果为零?
TL;DR
Spark 如何处理问题中显示的特定案例中体现的 BigDecimals
的规模似乎存在不一致。代码的行为就像是使用 BigDecimal
对象的比例将 BigDecimal
s 转换为未缩放的 Long
s,然后使用模式的比例转换回 BigDecimal
.
这可以通过以下方式解决
- 使用
setScale
或 明确设置所有 - 手动指定模式并从 RDD[行] 创建 DF
BigDecimal
值的比例以匹配DataFrame的架构
长版
这是我认为在我的机器上使用 Spark 2.4.0 时发生的情况。
在 groupBy.max
的情况下,Spark 正在经历 UnsafeRow and converting the BigDecimal
to an unscaled Long
and storing it as a Byte array in setDecimal
at this line (as verified with print statements). Then, when it later calls getDecimal,它使用架构中指定的 比例将字节数组转换回 BigDecimal
=56=]。
如果原始值中的比例与架构中的比例不匹配,则会导致值不正确。例如,
val foo = BigDecimal(123456)
foo.scale
0
val bytes = foo.underlying().unscaledValue().toByteArray()
// convert the bytes into BigDecimal using the original scale -- correct value
val sameValue = BigDecimal(new java.math.BigInteger(bytes), 0)
sameValue: scala.math.BigDecimal = 123456
// convert the bytes into BigDecimal using scale 18 -- wrong value
val smaller = BigDecimal(new java.math.BigInteger(bytes), 18)
smaller: scala.math.BigDecimal = 1.23456E-13
如果我只是 select bd_value
列的最大值,Spark 似乎不会通过 setDecimal
。我还没有验证为什么,或者它去了哪里。
但是,这可以解释问题中观察到的值。使用相同的大小写 class Foo
:
// This BigDecimal has scale 0
val rdd = spark.sparkContext.parallelize(Seq(Foo("C", BigDecimal(123456), 123456.0)))
// And shows with scale 0 in the DF
rdd.toDF.show
+-----+--------+--------+
|group|bd_value| d_value|
+-----+--------+--------+
| C| 123456|123456.0|
+-----+--------+--------+
// But the schema has scale 18
rdd.toDF.printSchema
root
|-- group: string (nullable = true)
|-- bd_value: decimal(38,18) (nullable = true)
|-- d_value: double (nullable = false)
// groupBy + max corrupts in the same way as converting to bytes via unscaled, then to BigDecimal with scale 18
rdd.groupBy("group").max("bd_value").show
+-----+-------------+
|group|max(bd_value)|
+-----+-------------+
| C| 1.23456E-13|
+-----+-------------+
// This BigDecimal is forced to have the same scale as the inferred schema
val rdd = spark.sparkContext.parallelize(Seq(Foo("C",BigDecimal(123456).setScale(18), 123456.0)))
// verified the scale is 18 in the DF
+-----+--------------------+--------+
|group| bd_value| d_value|
+-----+--------------------+--------+
| C|123456.0000000000...|123456.0|
+-----+--------------------+--------+
// And it works as expected
rdd1.groupBy("group").max("bd_value").show
+-----+--------------------+
|group| max(bd_value)|
+-----+--------------------+
| C|123456.0000000000...|
+-----+--------------------+
这也可以解释为什么,正如评论中所观察到的,当从具有显式模式的 RDD[Row] 转换时,它工作正常。
val rdd2 = spark.sparkContext.parallelize(Seq(Row("C", BigDecimal(123456), 123456.0)))
// schema has BigDecimal scale 18
val schema = StructType(Seq(StructField("group", StringType, true), StructField("bd_value", DecimalType(38,18), true), StructField("d_value",DoubleType,false)))
// createDataFrame interprets the value into the schema's scale
val df = spark.createDataFrame(rdd2, schema)
df.show
+-----+--------------------+--------+
|group| bd_value| d_value|
+-----+--------------------+--------+
| C|123456.0000000000...|123456.0|
+-----+--------------------+--------+