无法从Sqoop创建的Spark中的序列文件创建数据框
unable to create dataframe from sequence file in Spark created by Sqoop
我想读取 orders
数据并从中创建 RDD,它作为 sequence
文件存储在 cloudera
vm
的 hadoop fs 中。以下是我的步骤:
1) 将订单数据导入为序列文件:
sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba --password cloudera --table orders -m 1 --target-dir /ordersDataSet --as-sequencefile
2) 在 spark scala 中读取文件:
Spark 1.6
val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec => rec.toString())
3) 当我尝试从上面的 RDD 读取数据时,它抛出以下错误:
Caused by: java.io.IOException: WritableName can't load class: orders
at org.apache.hadoop.io.WritableName.getClass(WritableName.java:77)
at org.apache.hadoop.io.SequenceFile$Reader.getValueClass(SequenceFile.java:2108)
... 17 more
Caused by: java.lang.ClassNotFoundException: Class orders not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2185)
at org.apache.hadoop.io.WritableName.getClass(WritableName.java:75)
... 18 more
不知道为什么说找不到订单。我哪里错了?
我也参考了这两个链接的代码,但运气不好:
1) Refer sequence part
2) Refer step no. 8
sqoop 与它关系不大,这里是一个更现实的场景示例,其中 saveAsSequenceFile 始终假定 k、v 对 - 这可能对您有所帮助:
import org.apache.hadoop.io._
val RDD = sc.parallelize( List( (1, List("A", "B")) , (2, List("B", "C")) , (3, List("C", "D", "E")) ) )
val RDD2 = RDD.map(x => (x._1, x._2.mkString("/")))
RDD2.saveAsSequenceFile("/rushhour/seq-directory/2")
val sequence_data = sc.sequenceFile("/rushhour/seq-directory/*", classOf[IntWritable], classOf[Text])
.map{case (x, y) => (x.get(), y.toString().split("/")(0), y.toString().split("/")(1))}
sequence_data.collect
returns:
res20: Array[(Int, String, String)] = Array((1,A,B), (2,B,C), (3,C,D), (1,A,B), (2,B,C), (3,C,D))
我不确定你是想要 RDD 还是 DF,但是将 RDD 转换为 DF 当然是微不足道的。
我找到了解决我自己问题的方法。好吧,我将写一个冗长的解决方案,但我希望它能有所帮助。
1) 当我尝试使用 SQOOP
读取在 HDFS
中导入的数据时,由于以下原因出现错误:
A) 序列文件都是关于 key-value pair
。因此,当我使用 sqoop 导入它时,导入的数据不在键值对中,这就是读取它时抛出错误的原因。
B) 如果您尝试读取 few characters
,您可以从中找出在读取序列文件时作为输入传递所需的 two classes
,您将获得如下数据:
[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/pa* | head -c 300
SEQ!org.apache.hadoop.io.LongWritableorders�;�M��c�K�����@���-OCLOSED@���PENDING_PAYMENT@���/COMPLETE@���"{CLOSED@���cat: Unable to write to output stream.
上面你只能看到 one class
即 org.apache.hadoop.io.LongWritable
并且当我在读取序列数据时传递它时它会抛出 post 中提到的错误。
val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.LongWritable]).map(rec => rec.toString())
我不认为 B
点是该错误的主要原因,但我非常确定 A
点是该错误的真正罪魁祸首。
2) 以下是我解决问题的方法。
我使用 SQOOP
在其他目标中将数据作为 avro
data
文件导入。然后我使用以下方式从 avro 创建数据框:
scala> import com.databricks.spark.avro._;
scala> val avroData=sqlContext.read.avro("path")
现在我创建了 key-value pair
并将其保存为 sequence
文件
avroData.map(p=>(p(0).toString,(p(0)+"\t"+p(1)+"\t"+p(2)+"\t"+p(3)))).saveAsSequenceFile("/user/cloudera/problem5/sequence")
现在,当我尝试读取上述书面文件的 few
个字符时,它给了我 two classes
,这是我在读取文件时需要的,如下所示:
[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/part-00000 | head -c 300
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text^#%���8P���11 1374735600000 11599 CLOSED&2#2 1374735600000 256 PENDING_PAYMENT!33 1374735600000 12111 COMPLETE44 1374735600000 8827 CLOSED!55 1374735600000 11318 COMPLETE 66 1374cat: Unable to write to output stream.
scala> val sequenceData=sc.sequenceFile("/user/cloudera/problem5/sequence",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec=>rec.toString)
sequenceData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at map at <console>:30
现在,当我尝试打印数据时,它显示的数据如下:
scala> sequenceData.take(4).foreach(println)
(1,1 1374735600000 11599 CLOSED)
(2,2 1374735600000 256 PENDING_PAYMENT)
(3,3 1374735600000 12111 COMPLETE)
(4,4 1374735600000 8827 CLOSED)
最后但同样重要的是,感谢大家的辛勤付出。干杯!!
我想读取 orders
数据并从中创建 RDD,它作为 sequence
文件存储在 cloudera
vm
的 hadoop fs 中。以下是我的步骤:
1) 将订单数据导入为序列文件:
sqoop import --connect jdbc:mysql://localhost/retail_db --username retail_dba --password cloudera --table orders -m 1 --target-dir /ordersDataSet --as-sequencefile
2) 在 spark scala 中读取文件:
Spark 1.6
val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec => rec.toString())
3) 当我尝试从上面的 RDD 读取数据时,它抛出以下错误:
Caused by: java.io.IOException: WritableName can't load class: orders
at org.apache.hadoop.io.WritableName.getClass(WritableName.java:77)
at org.apache.hadoop.io.SequenceFile$Reader.getValueClass(SequenceFile.java:2108)
... 17 more
Caused by: java.lang.ClassNotFoundException: Class orders not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2185)
at org.apache.hadoop.io.WritableName.getClass(WritableName.java:75)
... 18 more
不知道为什么说找不到订单。我哪里错了?
我也参考了这两个链接的代码,但运气不好:
1) Refer sequence part
2) Refer step no. 8
sqoop 与它关系不大,这里是一个更现实的场景示例,其中 saveAsSequenceFile 始终假定 k、v 对 - 这可能对您有所帮助:
import org.apache.hadoop.io._
val RDD = sc.parallelize( List( (1, List("A", "B")) , (2, List("B", "C")) , (3, List("C", "D", "E")) ) )
val RDD2 = RDD.map(x => (x._1, x._2.mkString("/")))
RDD2.saveAsSequenceFile("/rushhour/seq-directory/2")
val sequence_data = sc.sequenceFile("/rushhour/seq-directory/*", classOf[IntWritable], classOf[Text])
.map{case (x, y) => (x.get(), y.toString().split("/")(0), y.toString().split("/")(1))}
sequence_data.collect
returns:
res20: Array[(Int, String, String)] = Array((1,A,B), (2,B,C), (3,C,D), (1,A,B), (2,B,C), (3,C,D))
我不确定你是想要 RDD 还是 DF,但是将 RDD 转换为 DF 当然是微不足道的。
我找到了解决我自己问题的方法。好吧,我将写一个冗长的解决方案,但我希望它能有所帮助。
1) 当我尝试使用 SQOOP
读取在 HDFS
中导入的数据时,由于以下原因出现错误:
A) 序列文件都是关于 key-value pair
。因此,当我使用 sqoop 导入它时,导入的数据不在键值对中,这就是读取它时抛出错误的原因。
B) 如果您尝试读取 few characters
,您可以从中找出在读取序列文件时作为输入传递所需的 two classes
,您将获得如下数据:
[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/pa* | head -c 300
SEQ!org.apache.hadoop.io.LongWritableorders�;�M��c�K�����@���-OCLOSED@���PENDING_PAYMENT@���/COMPLETE@���"{CLOSED@���cat: Unable to write to output stream.
上面你只能看到 one class
即 org.apache.hadoop.io.LongWritable
并且当我在读取序列数据时传递它时它会抛出 post 中提到的错误。
val sequenceData=sc.sequenceFile("/ordersDataSet",classOf[org.apache.hadoop.io.LongWritable],classOf[org.apache.hadoop.io.LongWritable]).map(rec => rec.toString())
我不认为 B
点是该错误的主要原因,但我非常确定 A
点是该错误的真正罪魁祸首。
2) 以下是我解决问题的方法。
我使用 SQOOP
在其他目标中将数据作为 avro
data
文件导入。然后我使用以下方式从 avro 创建数据框:
scala> import com.databricks.spark.avro._;
scala> val avroData=sqlContext.read.avro("path")
现在我创建了 key-value pair
并将其保存为 sequence
文件
avroData.map(p=>(p(0).toString,(p(0)+"\t"+p(1)+"\t"+p(2)+"\t"+p(3)))).saveAsSequenceFile("/user/cloudera/problem5/sequence")
现在,当我尝试读取上述书面文件的 few
个字符时,它给了我 two classes
,这是我在读取文件时需要的,如下所示:
[cloudera@quickstart ~]$ hadoop fs -cat /user/cloudera/problem5/sequence/part-00000 | head -c 300
SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text^#%���8P���11 1374735600000 11599 CLOSED&2#2 1374735600000 256 PENDING_PAYMENT!33 1374735600000 12111 COMPLETE44 1374735600000 8827 CLOSED!55 1374735600000 11318 COMPLETE 66 1374cat: Unable to write to output stream.
scala> val sequenceData=sc.sequenceFile("/user/cloudera/problem5/sequence",classOf[org.apache.hadoop.io.Text],classOf[org.apache.hadoop.io.Text]).map(rec=>rec.toString)
sequenceData: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at map at <console>:30
现在,当我尝试打印数据时,它显示的数据如下:
scala> sequenceData.take(4).foreach(println)
(1,1 1374735600000 11599 CLOSED)
(2,2 1374735600000 256 PENDING_PAYMENT)
(3,3 1374735600000 12111 COMPLETE)
(4,4 1374735600000 8827 CLOSED)
最后但同样重要的是,感谢大家的辛勤付出。干杯!!