当我尝试使用自定义架构和 BigDecimal 类型创建镶木地板文件时出现 NumberFormatException
NumberFormatException when I try to create a parquet file with a custom schema and BigDecimal types
我需要使用自定义 json 架构文件从 csv 文件创建镶木地板文件,如下所示:
{"type" : "struct","fields" : [ {"name" : "tenor_bank","type" : "string","nullable" : false}, {"name":"tenor_frtb", "type":"string", "nullable":false}, {"name":"weight", "type":"decimal(25,5)", "nullable":false} ]}
请看一下名为权重的字段。
这是输入 csv 文件的样子:
tenor_1;tenor_2;weight
1D;3M;1
7D;3M;1
1W;3M;1
1OD;3M;1
14D;3M;1
2W;3M;1
21D;3M;1
3W;3M;1
28D;3M;1
30D;3M;1
1M;3M;1
56D;3M;1
60D;3M;1
2M;3M;1
61D;3M;1
84D;3M;1
90D;3M;1
3M;3M;1
91D;3M;1
92D;3M;1
112D;3M;0.8333
112D;6M;0.1667
这是我加载架构 json 文件及其 DataFrame 的方式:
val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))
logger.debug("schema_json looks like " + schema_json.head)
val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType]
logger.debug("mySchemaStructType is " + mySchemaStructType)
myDF = loadCSV(sqlContext, path_input_csv,separator,mySchemaStructType ,header)
logger.debug("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()
//finally I create the parquet file. This line provokes the NuumberFormatException, concretely the line with .parquet(pathParquet)
writeDataFrame2Parquet(myDF, path_output_parquet, saveMode,header,separator)
//some utilities
def loadCSV(sqlContext : SQLContext, pathCSV: String, separator: String, customSchema: StructType, haveSchema: String): DataFrame = {
logger.info("loadCSV. header is " + haveSchema.toString + ", inferSchema is false pathCSV is " + pathCSV + " separator is " + separator)
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", haveSchema) // Use first line of all files as header
.option("delimiter", separator)
.option("nullValue","")
//Esto provoca que pete en runtime si encuentra un fallo en la línea que esté parseando
.option("mode","FAILFAST")
.schema(customSchema)
.load(pathCSV)
}
def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = {
df.write
.format("com.databricks.spark.csv")
.option("header", header)
.option("delimiter",delimiter)
.option("nullValue","")
.mode(saveMode)
.parquet(pathParquet)
}
执行到最后一行.parquet(pathParquet)时,出现异常:
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
**Caused by: java.lang.NumberFormatException**
at java.math.BigDecimal.<init>(BigDecimal.java:545)
at java.math.BigDecimal.<init>(BigDecimal.java:739)
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:68)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:121)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:108)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:240)
... 8 more
看起来当 spark-csv 尝试将 "weight" 字段呈现为小数 (25,5) 时,库崩溃了。有人可以帮帮我吗?
谢谢。
只需将逗号替换为点:0,8333
至 0.8333
因为,如您所见:
scala> BigDecimal("0.8333")
res16: scala.math.BigDecimal = 0.8333
scala> BigDecimal("0,8333")
java.lang.NumberFormatException
at java.math.BigDecimal.<init>(BigDecimal.java:494)
at java.math.BigDecimal.<init>(BigDecimal.java:383)
at java.math.BigDecimal.<init>(BigDecimal.java:806)
at scala.math.BigDecimal$.exact(BigDecimal.scala:125)
at scala.math.BigDecimal$.apply(BigDecimal.scala:283)
... 33 elided
我需要使用自定义 json 架构文件从 csv 文件创建镶木地板文件,如下所示:
{"type" : "struct","fields" : [ {"name" : "tenor_bank","type" : "string","nullable" : false}, {"name":"tenor_frtb", "type":"string", "nullable":false}, {"name":"weight", "type":"decimal(25,5)", "nullable":false} ]}
请看一下名为权重的字段。
这是输入 csv 文件的样子:
tenor_1;tenor_2;weight
1D;3M;1
7D;3M;1
1W;3M;1
1OD;3M;1
14D;3M;1
2W;3M;1
21D;3M;1
3W;3M;1
28D;3M;1
30D;3M;1
1M;3M;1
56D;3M;1
60D;3M;1
2M;3M;1
61D;3M;1
84D;3M;1
90D;3M;1
3M;3M;1
91D;3M;1
92D;3M;1
112D;3M;0.8333
112D;6M;0.1667
这是我加载架构 json 文件及其 DataFrame 的方式:
val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))
logger.debug("schema_json looks like " + schema_json.head)
val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType]
logger.debug("mySchemaStructType is " + mySchemaStructType)
myDF = loadCSV(sqlContext, path_input_csv,separator,mySchemaStructType ,header)
logger.debug("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()
//finally I create the parquet file. This line provokes the NuumberFormatException, concretely the line with .parquet(pathParquet)
writeDataFrame2Parquet(myDF, path_output_parquet, saveMode,header,separator)
//some utilities
def loadCSV(sqlContext : SQLContext, pathCSV: String, separator: String, customSchema: StructType, haveSchema: String): DataFrame = {
logger.info("loadCSV. header is " + haveSchema.toString + ", inferSchema is false pathCSV is " + pathCSV + " separator is " + separator)
sqlContext.read
.format("com.databricks.spark.csv")
.option("header", haveSchema) // Use first line of all files as header
.option("delimiter", separator)
.option("nullValue","")
//Esto provoca que pete en runtime si encuentra un fallo en la línea que esté parseando
.option("mode","FAILFAST")
.schema(customSchema)
.load(pathCSV)
}
def writeDataFrame2Parquet(df: DataFrame, pathParquet: String, saveMode: SaveMode,header: String,delimiter:String): Unit = {
df.write
.format("com.databricks.spark.csv")
.option("header", header)
.option("delimiter",delimiter)
.option("nullValue","")
.mode(saveMode)
.parquet(pathParquet)
}
执行到最后一行.parquet(pathParquet)时,出现异常:
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$$anonfun$apply$mcV$sp.apply(InsertIntoHadoopFsRelation.scala:150)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
**Caused by: java.lang.NumberFormatException**
at java.math.BigDecimal.<init>(BigDecimal.java:545)
at java.math.BigDecimal.<init>(BigDecimal.java:739)
at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:68)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:121)
at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan.apply(CsvRelation.scala:108)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:240)
... 8 more
看起来当 spark-csv 尝试将 "weight" 字段呈现为小数 (25,5) 时,库崩溃了。有人可以帮帮我吗?
谢谢。
只需将逗号替换为点:0,8333
至 0.8333
因为,如您所见:
scala> BigDecimal("0.8333")
res16: scala.math.BigDecimal = 0.8333
scala> BigDecimal("0,8333")
java.lang.NumberFormatException
at java.math.BigDecimal.<init>(BigDecimal.java:494)
at java.math.BigDecimal.<init>(BigDecimal.java:383)
at java.math.BigDecimal.<init>(BigDecimal.java:806)
at scala.math.BigDecimal$.exact(BigDecimal.scala:125)
at scala.math.BigDecimal$.apply(BigDecimal.scala:283)
... 33 elided