将 RDD 保存到 HDFS 时出错

Error saving RDD into HDFS

我正在尝试使用 Scala 将 RDD 保存到 HDFS 中,但出现此错误:

WARN scheduler.TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, quickstart.cloudera, executor 3): java.lang.NumberFormatException: empty String
        at sun.misc.FloatingDecimal.readJavaFormatString(FloatingDecimal.java:1020)
        at java.lang.Float.parseFloat(Float.java:452)
        at scala.collection.immutable.StringLike$class.toFloat(StringLike.scala:231)
        at scala.collection.immutable.StringOps.toFloat(StringOps.scala:31)
        at $line24.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:33)
        at $line24.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun.apply(<console>:33)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at scala.collection.Iterator$$anon.next(Iterator.scala:328)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1196)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1195)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1195)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1279)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1203)
        at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1183)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
        at org.apache.spark.scheduler.Task.run(Task.scala:89)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:242)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

首先,我读取了一个位于 HDFS 中的文件,它读取正确。之后,我尝试进行一些转换,例如更改字段分隔符(管道),然后将其写回 HDFS。如果有人可以帮助我,这是我的代码。

val productsRDD= sc.textFile("/user/cloudera/products/products")
val products2RDD=productsRDD.map(a=>a.split(","))
case class clas1(product_id: Int,product_category_id: Int,product_name: String,product_description: String,product_price: Float,product_image: String)
val products = products2RDD.map(b => clas1(Integer.parseInt(0),Integer.parseInt(1),(2).toString,(3).toString,(4).toFloat,(5).toString))
val r = products.toDF()
r.registerTempTable("productsDF")
val prodDF = sqlContext.sql("select * from productsDF where product_price > 100")

/* everything goes fine until this line*/

prodDF.map(c => c(0)+"|"+c(1)+"|"+c(2)+"|"+c(3)+"|"+c(4)+"|"+c(5)).saveAsTextFile("/user/cloudera/problem1/pipes1")

数据框字段:

| Field               | Type         | Null | Key | Default | Extra          |
+---------------------+--------------+------+-----+---------+----------------+
| product_id          | int(11)      | NO   | PRI | NULL    | auto_increment |
| product_category_id | int(11)      | NO   |     | NULL    |                |
| product_name        | varchar(45)  | NO   |     | NULL    |                |
| product_description | varchar(255) | NO   |     | NULL    |                |
| product_price       | float        | NO   |     | NULL    |                |
| product_image       | varchar(255) | NO   |     | NULL    |                |

我是 Scala 的新手,感谢您的帮助... 谢谢!

根据您的 CDH 版本,Spark2 具有 内置 CSV reader。

case class Product(product_id: Int,product_category_id: Int,product_name: String,product_description: String,product_price: Float,product_image: String)

val productsDs = spark.csv("/user/cloudera/products/products").as[Product]
val expensiveProducts = productDs.where($"product_price" > 100.0)

如果不使用 Spark2,您绝对应该升级一些本地客户端以指向您的同一个 YARN 集群,或者使用 spark-csv 不必处理 map(... split(","))[= 的糟糕 CSV 解析器14=]

注意:如果您的列为空,我不知道案例 class 是否仍然有效,如错误所述

如果您只想更改分隔符,您也可以使用 CSV 格式化程序将其写出来

expensiveProducts.write
    .option("sep", "|")
    .csv("/user/cloudera/problem1/pipes1")

从您的错误来看 - java.lang.NumberFormatException:空字符串

当您尝试从字符串为空的字符串中解析整数时,您的错误似乎存在,因此您会遇到这个特定的错误。

你可以做的是在进行转换之前和拆分之后使用合并。创建一个数据框,spark-sql 中有一个合并功能,它将用 "NULL"

替换您的空值