从 spark 查询镶木地板 table 上的配置单元时损坏的十进制值

Corrupt Decimal value when querying a hive on parquet table from spark

在使用 Spark SQL.

查询 Spark 代码中实木复合地板上的外部配置单元 table 时,Spark 返回 garbage/incorrect 十进制字段的值

在我的应用程序流程中,一个 spark 进程最初将这些 parquet 文件的数据直接写入存在外部 Hive table 的 HDFS。当第二个 Spark 进程尝试使用 Spark-SQL.

从 Hive table 使用时,获取了不正确的数据

场景步骤:这是一个重现问题的简单演示:

  1. 写入 Parquet:我正在将数据写入 HDFS 中的 parquet 文件,Spark 本身假定小数字段的精度为 Decimal(28,26)

    scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
    df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    scala> df.schema
    res0: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,false), StructField(value,DecimalType(28,26),false))
    scala> df.show
    +-----+--------------------+
    | name|               value|
    +-----+--------------------+
    |dummy|10.70000000000000...|
    +-----+--------------------+
    scala> df.write.option("overwrite",true).parquet("/my/hdfs/location/test")
    
  2. 读取 parquet 文件:查看值是否正确写入。

    scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("/tenants/gwm/morph/test/tablePrecisionTest/test")
    df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
    scala> df_parq.show
    +-------+--------------------+
    |   name|               value|
    +-------+--------------------+
    |  dummy|10.70000000000000...|
    +-------+--------------------+ 
    
  3. 创建外部配置单元table:在 parquet 位置之上,小数字段为 Decimal(18,6)

    hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
    
  4. 运行 直线中的 Hive 查询:验证是否返回了正确的数据。

    hive> select * from db1.test_precision;
    +----------------------+-----------------------+--+
    | test_precision.name  | test_precision.value  |
    +----------------------+-----------------------+--+
    | dummy                | 10.7                  |
    +----------------------+-----------------------+--+
    
  5. 运行 使用 Spark Sql 的相同查询:生成了不正确的十进制值。

    scala> val df_hive = spark.sql("select * from db1.test_precision")
    df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(18,6)]
    scala> df_hive.show
    +-----+-----------+
    | name|      value|
    +-----+-----------+
    |dummy|-301.989888|
    +-----+-----------+
    

注意 - 我知道在第一步使用显式 cast(value as Decima(18,6)) 将值存储到 parquet 可以解决这个问题,但我已经有了我的历史数据无法立即重新加载。

有什么方法可以在读取第 5 步的值时解决这个问题?

我完全复制了您的示例,除了第 3 步。在为 Decimal 类型创建 table 时,您应该保持精度和比例。

在你的例子中,你创建了一个 Decimal(28,26)

df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

所以你应该创建一个 table 具有相同的精度和十进制类型的小数位数。

hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
/**AND NOT**/
hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

scala> df.show()
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+


scala> df.printSchema()
root
 |-- name: string (nullable = false)
 |-- value: decimal(28,26) (nullable = false)

scala> df.write.option("overwrite",true).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")

scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

scala> df_parq.printSchema
root
 |-- name: string (nullable = true)
 |-- value: decimal(28,26) (nullable = true)


scala> df_parq.show
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+

hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';


hive> select * from test_precision;

+----------------------+-----------------------+--+
| test_precision.name  | test_precision.value  |
+----------------------+-----------------------+--+
| dummy                | 10.7                  |
+----------------------+-----------------------+--+

scala> val df_hive = spark.sql("select * from test.test_precision")
df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]

scala> df_hive.show
+-----+--------------------+
| name|               value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+


scala> df_hive.printSchema
root
 |-- name: string (nullable = true)
 |-- value: decimal(28,26) (nullable = true)