从 spark 查询镶木地板 table 上的配置单元时损坏的十进制值
Corrupt Decimal value when querying a hive on parquet table from spark
在使用 Spark SQL.
查询 Spark 代码中实木复合地板上的外部配置单元 table 时,Spark 返回 garbage/incorrect 十进制字段的值
在我的应用程序流程中,一个 spark 进程最初将这些 parquet 文件的数据直接写入存在外部 Hive table 的 HDFS。当第二个 Spark 进程尝试使用 Spark-SQL.
从 Hive table 使用时,获取了不正确的数据
场景步骤:这是一个重现问题的简单演示:
写入 Parquet:我正在将数据写入 HDFS 中的 parquet 文件,Spark 本身假定小数字段的精度为 Decimal(28,26)
。
scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df.schema
res0: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,false), StructField(value,DecimalType(28,26),false))
scala> df.show
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
scala> df.write.option("overwrite",true).parquet("/my/hdfs/location/test")
读取 parquet 文件:查看值是否正确写入。
scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("/tenants/gwm/morph/test/tablePrecisionTest/test")
df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df_parq.show
+-------+--------------------+
| name| value|
+-------+--------------------+
| dummy|10.70000000000000...|
+-------+--------------------+
创建外部配置单元table:在 parquet 位置之上,小数字段为 Decimal(18,6)
。
hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
运行 直线中的 Hive 查询:验证是否返回了正确的数据。
hive> select * from db1.test_precision;
+----------------------+-----------------------+--+
| test_precision.name | test_precision.value |
+----------------------+-----------------------+--+
| dummy | 10.7 |
+----------------------+-----------------------+--+
运行 使用 Spark Sql 的相同查询:生成了不正确的十进制值。
scala> val df_hive = spark.sql("select * from db1.test_precision")
df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(18,6)]
scala> df_hive.show
+-----+-----------+
| name| value|
+-----+-----------+
|dummy|-301.989888|
+-----+-----------+
注意 - 我知道在第一步使用显式 cast(value as Decima(18,6))
将值存储到 parquet 可以解决这个问题,但我已经有了我的历史数据无法立即重新加载。
有什么方法可以在读取第 5 步的值时解决这个问题?
我完全复制了您的示例,除了第 3 步。在为 Decimal 类型创建 table 时,您应该保持精度和比例。
在你的例子中,你创建了一个 Decimal(28,26)
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
所以你应该创建一个 table 具有相同的精度和十进制类型的小数位数。
hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
/**AND NOT**/
hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df.show()
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
scala> df.printSchema()
root
|-- name: string (nullable = false)
|-- value: decimal(28,26) (nullable = false)
scala> df.write.option("overwrite",true).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df_parq.printSchema
root
|-- name: string (nullable = true)
|-- value: decimal(28,26) (nullable = true)
scala> df_parq.show
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
hive> select * from test_precision;
+----------------------+-----------------------+--+
| test_precision.name | test_precision.value |
+----------------------+-----------------------+--+
| dummy | 10.7 |
+----------------------+-----------------------+--+
scala> val df_hive = spark.sql("select * from test.test_precision")
df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df_hive.show
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
scala> df_hive.printSchema
root
|-- name: string (nullable = true)
|-- value: decimal(28,26) (nullable = true)
在使用 Spark SQL.
查询 Spark 代码中实木复合地板上的外部配置单元 table 时,Spark 返回 garbage/incorrect 十进制字段的值在我的应用程序流程中,一个 spark 进程最初将这些 parquet 文件的数据直接写入存在外部 Hive table 的 HDFS。当第二个 Spark 进程尝试使用 Spark-SQL.
从 Hive table 使用时,获取了不正确的数据场景步骤:这是一个重现问题的简单演示:
写入 Parquet:我正在将数据写入 HDFS 中的 parquet 文件,Spark 本身假定小数字段的精度为
Decimal(28,26)
。scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value") df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)] scala> df.schema res0: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,false), StructField(value,DecimalType(28,26),false)) scala> df.show +-----+--------------------+ | name| value| +-----+--------------------+ |dummy|10.70000000000000...| +-----+--------------------+ scala> df.write.option("overwrite",true).parquet("/my/hdfs/location/test")
读取 parquet 文件:查看值是否正确写入。
scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("/tenants/gwm/morph/test/tablePrecisionTest/test") df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)] scala> df_parq.show +-------+--------------------+ | name| value| +-------+--------------------+ | dummy|10.70000000000000...| +-------+--------------------+
创建外部配置单元table:在 parquet 位置之上,小数字段为
Decimal(18,6)
。hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
运行 直线中的 Hive 查询:验证是否返回了正确的数据。
hive> select * from db1.test_precision; +----------------------+-----------------------+--+ | test_precision.name | test_precision.value | +----------------------+-----------------------+--+ | dummy | 10.7 | +----------------------+-----------------------+--+
运行 使用 Spark Sql 的相同查询:生成了不正确的十进制值。
scala> val df_hive = spark.sql("select * from db1.test_precision") df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(18,6)] scala> df_hive.show +-----+-----------+ | name| value| +-----+-----------+ |dummy|-301.989888| +-----+-----------+
注意 - 我知道在第一步使用显式 cast(value as Decima(18,6))
将值存储到 parquet 可以解决这个问题,但我已经有了我的历史数据无法立即重新加载。
有什么方法可以在读取第 5 步的值时解决这个问题?
我完全复制了您的示例,除了第 3 步。在为 Decimal 类型创建 table 时,您应该保持精度和比例。
在你的例子中,你创建了一个 Decimal(28,26)
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
所以你应该创建一个 table 具有相同的精度和十进制类型的小数位数。
hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
/**AND NOT**/
hive> create external table db1.test_precision(name string, value Decimal(18,6)) STORED As PARQUET LOCATION '/my/hdfs/location/test';
scala> val df = spark.sql("select 'dummy' as name, 10.70000000000000000000000000 as value")
df: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df.show()
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
scala> df.printSchema()
root
|-- name: string (nullable = false)
|-- value: decimal(28,26) (nullable = false)
scala> df.write.option("overwrite",true).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
scala> val df_parq = spark.read.option("spark.sql.decimalOperations.allowPrecisionLoss",false).parquet("hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal")
df_parq: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df_parq.printSchema
root
|-- name: string (nullable = true)
|-- value: decimal(28,26) (nullable = true)
scala> df_parq.show
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
hive> CREATE EXTERNAL TABLE test.test_precision(name string, value Decimal(28,26)) STORED AS PARQUET LOCATION 'hdfs://quickstart.cloudera:8020/user/cloudera/test_decimal';
hive> select * from test_precision;
+----------------------+-----------------------+--+
| test_precision.name | test_precision.value |
+----------------------+-----------------------+--+
| dummy | 10.7 |
+----------------------+-----------------------+--+
scala> val df_hive = spark.sql("select * from test.test_precision")
df_hive: org.apache.spark.sql.DataFrame = [name: string, value: decimal(28,26)]
scala> df_hive.show
+-----+--------------------+
| name| value|
+-----+--------------------+
|dummy|10.70000000000000...|
+-----+--------------------+
scala> df_hive.printSchema
root
|-- name: string (nullable = true)
|-- value: decimal(28,26) (nullable = true)