Apache Spark:不支持的镶木地板数据类型
ApacheSpark: Unsupported parquet datatype
我正在尝试使用 SparkSql HiveContext 读取 Hive table。但是,当我提交作业时,出现以下错误:
Exception in thread "main" java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(11) amount (DECIMAL(24,7))
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:77)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:131)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes.apply(ParquetTypes.scala:383)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes.apply(ParquetTypes.scala:380)
列类型为 DECIMAL(24,7)。我已经使用 HiveQL 更改了列类型,但它不起作用。我也试过在 sparksql 中转换为另一种 Decimal 类型,如下所示:
val results = hiveContext.sql("SELECT cast(amount as DECIMAL(18,7)), number FROM dmp_wr.test")
但是,我遇到了同样的错误。我的代码是这样的:
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf().setAppName("TColumnModify")
val sc: SparkContext = new SparkContext(conf)
val vectorAcc = sc.accumulator(new MyVector())(VectorAccumulator)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val results = hiveContext.sql("SELECT amount, number FROM dmp_wr.test")
我该如何解决这个问题?感谢您的回复。
Edit1: 我找到了引发异常的 Spark 源代码行。看起来是这样
if(originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18)
因此,我创建了新的 table,其中包含 DECIMAL(18,7) 类型的列,并且我的代码按预期工作。
我删除 table 并创建一个包含 DECIMAL(24,7) 列的新列,之后我更改了列类型
alter table qwe change amount amount decimal(18,7)
我可以看到它已更改为 DECIMAL(18,7),但 Spark
不接受改变。它仍然将列类型读取为 DECIMAL(24,7) 并给出相同的错误。
主要原因是什么?
alter table qwe change amount amount decimal(18,7)
Hive 中的 Alter table 命令不会触及存储在 Hive 中的实际数据。它只会更改 Hive Metastore 中的元数据。这与普通数据库中的 "alter table" 命令(如 MySQL)非常不同。
当 Spark 从 Parquet 文件中读取数据时,它会尝试使用实际 Parquet 文件中的元数据来反序列化数据,这仍然会给它 DECIMAL(24, 7 ).
您的问题有 2 个解决方案:
1. 试用新版本的 Spark - 从主干构建。请参阅 https://issues.apache.org/jira/browse/SPARK-6777,它完全改变了这部分代码(虽然只会出现在 Spark 1.5 中),所以希望您不会再遇到同样的问题。
- 手动转换 table 中的数据。您可以使用像 "INSERT OVERWRITE TABLE new_table SELECT * from old_table") 这样的配置单元查询。
我正在尝试使用 SparkSql HiveContext 读取 Hive table。但是,当我提交作业时,出现以下错误:
Exception in thread "main" java.lang.RuntimeException: Unsupported parquet datatype optional fixed_len_byte_array(11) amount (DECIMAL(24,7))
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.toPrimitiveDataType(ParquetTypes.scala:77)
at org.apache.spark.sql.parquet.ParquetTypesConverter$.toDataType(ParquetTypes.scala:131)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes.apply(ParquetTypes.scala:383)
at org.apache.spark.sql.parquet.ParquetTypesConverter$$anonfun$convertToAttributes.apply(ParquetTypes.scala:380)
列类型为 DECIMAL(24,7)。我已经使用 HiveQL 更改了列类型,但它不起作用。我也试过在 sparksql 中转换为另一种 Decimal 类型,如下所示:
val results = hiveContext.sql("SELECT cast(amount as DECIMAL(18,7)), number FROM dmp_wr.test")
但是,我遇到了同样的错误。我的代码是这样的:
def main(args: Array[String]) {
val conf: SparkConf = new SparkConf().setAppName("TColumnModify")
val sc: SparkContext = new SparkContext(conf)
val vectorAcc = sc.accumulator(new MyVector())(VectorAccumulator)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
val results = hiveContext.sql("SELECT amount, number FROM dmp_wr.test")
我该如何解决这个问题?感谢您的回复。
Edit1: 我找到了引发异常的 Spark 源代码行。看起来是这样
if(originalType == ParquetOriginalType.DECIMAL && decimalInfo.getPrecision <= 18)
因此,我创建了新的 table,其中包含 DECIMAL(18,7) 类型的列,并且我的代码按预期工作。
我删除 table 并创建一个包含 DECIMAL(24,7) 列的新列,之后我更改了列类型
alter table qwe change amount amount decimal(18,7)
我可以看到它已更改为 DECIMAL(18,7),但 Spark
不接受改变。它仍然将列类型读取为 DECIMAL(24,7) 并给出相同的错误。
主要原因是什么?
alter table qwe change amount amount decimal(18,7)
Hive 中的 Alter table 命令不会触及存储在 Hive 中的实际数据。它只会更改 Hive Metastore 中的元数据。这与普通数据库中的 "alter table" 命令(如 MySQL)非常不同。
当 Spark 从 Parquet 文件中读取数据时,它会尝试使用实际 Parquet 文件中的元数据来反序列化数据,这仍然会给它 DECIMAL(24, 7 ).
您的问题有 2 个解决方案: 1. 试用新版本的 Spark - 从主干构建。请参阅 https://issues.apache.org/jira/browse/SPARK-6777,它完全改变了这部分代码(虽然只会出现在 Spark 1.5 中),所以希望您不会再遇到同样的问题。
- 手动转换 table 中的数据。您可以使用像 "INSERT OVERWRITE TABLE new_table SELECT * from old_table") 这样的配置单元查询。