在 Spark 中读取 CSV 文件并使用创建的 RDD 将其插入到 HBase

Read CSV file in Spark and insert it to HBase using created RDD

我可以使用 Put 方法将数据插入 HBase table。但是,现在我想从 CSV 文件中读取数据并将其写入 HBase table。

我写了这段代码:

object Read_CSV_Method2 {

    case class HotelBooking(hotelName : String, is_cancelled : String, reservation_status : String, 
           reservation_status_date : String)


    def main(args : Array[String]){             

        Logger.getLogger("org").setLevel(Level.ERROR)
        val sparkConf = new SparkConf().setAppName("Spark_Hbase_Connection").setMaster("local[*]")
        val sc = new SparkContext(sparkConf)
        val conf = HBaseConfiguration.create()    

        // establish a connection
        val connection:Connection = ConnectionFactory.createConnection(conf)

        // Table on which different commands have to be run.
        val tableName = connection.getTable(TableName.valueOf("hotelTable"))

        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._

        val textRDD = sc.textFile("/Users/sukritmehta/Documents/hotel_bookings.csv")

        var c = 1
        textRDD.foreachPartition{
                   iter => 
                     val hbaseConf = HBaseConfiguration.create()
                     val connection:Connection = ConnectionFactory.createConnection(hbaseConf)
                     val myTable = connection.getTable(TableName.valueOf("hotelTable"))
                     iter.foreach{
                       f =>

                         val col = f.split(",")
                         val insertData = new Put(Bytes.toBytes(c.toString()))
                         insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("hotelName"), Bytes.toBytes(col(0).toString()))
                         insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("is_canceled"), Bytes.toBytes(col(1).toString()))
                         insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("reservation_status"), Bytes.toBytes(col(30).toString()))
                         insertData.addColumn(Bytes.toBytes("hotelFam"), Bytes.toBytes("reservation_status_date"), Bytes.toBytes(col(31).toString()))
                         c = c+1
                         myTable.put(insertData)
                     }
            }

       }

    }

但是在 运行 这段代码之后,我收到以下错误:

Exception in thread "dag-scheduler-event-loop" java.lang.NoClassDefFoundError: org/apache/hadoop/mapred/InputSplitWithLocationInfo
    at org.apache.spark.rdd.HadoopRDD.getPreferredLocations(HadoopRDD.scala:356)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations.apply(RDD.scala:275)
    at org.apache.spark.rdd.RDD$$anonfun$preferredLocations.apply(RDD.scala:275)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:274)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:2003)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply$mcVI$sp(DAGScheduler.scala:2014)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply(DAGScheduler.scala:2013)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$$anonfun$apply.apply(DAGScheduler.scala:2013)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal.apply(DAGScheduler.scala:2013)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal.apply(DAGScheduler.scala:2011)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:2011)
    at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1977)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun.apply(DAGScheduler.scala:1112)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun.apply(DAGScheduler.scala:1110)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map.apply(TraversableLike.scala:234)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1110)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:1069)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1013)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2065)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapred.InputSplitWithLocationInfo
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    ... 32 more

pom.xml 文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Spark_Hbase</groupId>
  <artifactId>Spark_Hbase</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>
    <sourceDirectory>src</sourceDirectory>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
        <!-- Scala and Spark dependencies -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.11</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-avro_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.4.0</version>
            <scope>provided</scope>
        </dependency>
        <!-- <dependency> <groupId>org.codehaus.janino</groupId> <artifactId>commons-compiler</artifactId> 
            <version>3.0.7</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> 
            <artifactId>spark-network-common_2.11</artifactId> <version>2.1.1</version> 
            </dependency> -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.9.9</version>
        </dependency>

        <!-- <dependency> <groupId>org.mongodb.spark</groupId> <artifactId>mongo-spark-connector_2.10</artifactId> 
            <version>2.1.1</version> </dependency> -->
        <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.10</artifactId> 
            <version>2.1.1</version> </dependency> -->

        <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase -->


        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.2.4</version>
        </dependency>

            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-core</artifactId>
                <version>1.2.1</version>
            </dependency>      



        <dependency>
              <groupId>org.apache.hbase</groupId>
              <artifactId>hbase-spark</artifactId>
              <version>2.0.0-alpha4</version> <!-- Hortonworks Latest -->
        </dependency>


         <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce -->
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-mapreduce</artifactId>
    <version>2.2.4</version>
</dependency>


    </dependencies>
</project>

如果有人能帮我解决这个问题就太好了。

我觉得你不见了

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>2.6.5</version>
            <scope>provided</scope>
        </dependency>