在 Spark 中读取 CSV 文件并使用创建的 RDD 将其插入到 HBase
Read CSV file in Spark and insert it to HBase using created RDD
我可以使用 Put 方法将数据插入 HBase table。但是,现在我想从 CSV 文件中读取数据并将其写入 HBase table。
我写了这段代码:
object Read_CSV_Method2 {
case class HotelBooking(hotelName : String, is_cancelled : String, reservation_status : String,
reservation_status_date : String)
def main(args : Array[String]){
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("Spark_Hbase_Connection").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
// establish a connection
val connection:Connection = ConnectionFactory.createConnection(conf)
// Table on which different commands have to be run.
val tableName = connection.getTable(TableName.valueOf("hotelTable"))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val textRDD = sc.textFile("/Users/sukritmehta/Documents/hotel_bookings.csv")
var c = 1
textRDD.foreachPartition{
iter =>
val hbaseConf = HBaseConfiguration.create()
val connection:Connection = ConnectionFactory.createConnection(hbaseConf)
val myTable = connection.getTable(TableName.valueOf("hotelTable"))
iter.foreach{
f =>
val col = f.split(",")
val insertData = new Put(Bytes.toBytes(c.toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("hotelName"), Bytes.toBytes(col(0).toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("is_canceled"), Bytes.toBytes(col(1).toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("reservation_status"), Bytes.toBytes(col(30).toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("reservation_status_date"), Bytes.toBytes(col(31).toString()))
c = c+1
myTable.put(insertData)
}
}
}
}
但是在 运行 这段代码之后,我收到以下错误:
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputSplitWithLocationInfo
at org.apache.spark.rdd.HadoopRDD.getPreferredLocations(HadoopRDD.scala:356)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations.apply(RDD.scala:275)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations.apply(RDD.scala:275)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:2003)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply$mcVI$sp(DAGScheduler.scala:2014)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply(DAGScheduler.scala:2013)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply(DAGScheduler.scala:2013)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal.apply(DAGScheduler.scala:2013)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal.apply(DAGScheduler.scala:2011)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:2011)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1977)
at org.apache.spark.scheduler.DAGScheduler$$anonfun.apply(DAGScheduler.scala:1112)
at org.apache.spark.scheduler.DAGScheduler$$anonfun.apply(DAGScheduler.scala:1110)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1110)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1069)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1013)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2065)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplitWithLocationInfo
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 32 more
pom.xml 文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Spark_Hbase</groupId>
<artifactId>Spark_Hbase</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Scala and Spark dependencies -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<!-- <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId>
<version>3.0.7</version> </dependency> <dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId> <version>2.1.1</version>
</dependency> -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<!-- <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.10</artifactId>
<version>2.1.1</version> </dependency> -->
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId>
<version>2.1.1</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.0.0-alpha4</version> <!-- Hortonworks Latest -->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
</project>
如果有人能帮我解决这个问题就太好了。
我觉得你不见了
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.5</version>
<scope>provided</scope>
</dependency>
我可以使用 Put 方法将数据插入 HBase table。但是,现在我想从 CSV 文件中读取数据并将其写入 HBase table。
我写了这段代码:
object Read_CSV_Method2 {
case class HotelBooking(hotelName : String, is_cancelled : String, reservation_status : String,
reservation_status_date : String)
def main(args : Array[String]){
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName("Spark_Hbase_Connection").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
// establish a connection
val connection:Connection = ConnectionFactory.createConnection(conf)
// Table on which different commands have to be run.
val tableName = connection.getTable(TableName.valueOf("hotelTable"))
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val textRDD = sc.textFile("/Users/sukritmehta/Documents/hotel_bookings.csv")
var c = 1
textRDD.foreachPartition{
iter =>
val hbaseConf = HBaseConfiguration.create()
val connection:Connection = ConnectionFactory.createConnection(hbaseConf)
val myTable = connection.getTable(TableName.valueOf("hotelTable"))
iter.foreach{
f =>
val col = f.split(",")
val insertData = new Put(Bytes.toBytes(c.toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("hotelName"), Bytes.toBytes(col(0).toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("is_canceled"), Bytes.toBytes(col(1).toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("reservation_status"), Bytes.toBytes(col(30).toString()))
insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("reservation_status_date"), Bytes.toBytes(col(31).toString()))
c = c+1
myTable.put(insertData)
}
}
}
}
但是在 运行 这段代码之后,我收到以下错误:
Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputSplitWithLocationInfo
at org.apache.spark.rdd.HadoopRDD.getPreferredLocations(HadoopRDD.scala:356)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations.apply(RDD.scala:275)
at org.apache.spark.rdd.RDD$$anonfun$preferredLocations.apply(RDD.scala:275)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:2003)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply$mcVI$sp(DAGScheduler.scala:2014)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply(DAGScheduler.scala:2013)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply(DAGScheduler.scala:2013)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal.apply(DAGScheduler.scala:2013)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal.apply(DAGScheduler.scala:2011)
at scala.collection.immutable.List.foreach(List.scala:392)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:2011)
at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1977)
at org.apache.spark.scheduler.DAGScheduler$$anonfun.apply(DAGScheduler.scala:1112)
at org.apache.spark.scheduler.DAGScheduler$$anonfun.apply(DAGScheduler.scala:1110)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1110)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1069)
at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1013)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2065)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplitWithLocationInfo
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 32 more
pom.xml 文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>Spark_Hbase</groupId>
<artifactId>Spark_Hbase</artifactId>
<version>0.0.1-SNAPSHOT</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!-- Scala and Spark dependencies -->
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<!-- <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId>
<version>3.0.7</version> </dependency> <dependency> <groupId>org.apache.spark</groupId>
<artifactId>spark-network-common_2.11</artifactId> <version>2.1.1</version>
</dependency> -->
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<!-- <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.10</artifactId>
<version>2.1.1</version> </dependency> -->
<!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId>
<version>2.1.1</version> </dependency> -->
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-spark</artifactId>
<version>2.0.0-alpha4</version> <!-- Hortonworks Latest -->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-mapreduce</artifactId>
<version>2.2.4</version>
</dependency>
</dependencies>
</project>
如果有人能帮我解决这个问题就太好了。
我觉得你不见了
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.5</version>
<scope>provided</scope>
</dependency>