如何使用 Scala 在 Spark 中加入两个 Hbase 表
How to join two Hbase tables in Spark using Scala
我在 HBase 中有两个表需要使用 scala 加入。这些表是使用 sqoop 从 Oracle 导入的,可以在 Hue 数据浏览器中查询
使用 Spark 1.5、Scala 2.10.4。
我正在使用此处的 HBase 数据连接器:https://github.com/nerdammer/spark-hbase-connector
import it.nerdammer.spark.hbase._
import org.apache.hadoop.hbase.client.{ HBaseAdmin, Result }
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
import it.nerdammer.spark.hbase.conversion.{ FieldReader, FieldWriter }
import org.apache.hadoop.hbase.util.Bytes
case class Artist(id: String,
name: String,
age: Int);
case class Cd(id: String,
artistId: String,
title: String,
year: Int);
case class ArtistCd(id: String,
name: String,
title: String,
year: Int);
implicit def artistReader: FieldReader[Artist] = new FieldReader[Artist] {
override def map(data: HBaseData): Artist = Artist(
id = Bytes.toString(data.head.get),
name = Bytes.toString(data.drop(1).head.get),
age = Bytes.toInt(data.drop(2).head.get));
override def columns = Seq("NAME", "AGE");
};
implicit def cdReader: FieldReader[Cd] = new FieldReader[Cd] {
override def map(data: HBaseData): Cd = Cd(
id = Bytes.toString(data.head.get),
artistId = Bytes.toString(data.drop(1).head.get),
title = Bytes.toString(data.drop(2).head.get),
year = Bytes.toInt(data.drop(3).head.get));
override def columns = Seq("ARTIST_ID", "TITLE", "YEAR");
};
implicit def artistCdWriter: FieldWriter[ArtistCd] = new FieldWriter[ArtistCd] {
override def map(data: ArtistCd): HBaseData =
Seq(
Some(Bytes.toBytes(data.id)),
Some(Bytes.toBytes(data.name)),
Some(Bytes.toBytes(data.title)),
Some(Bytes.toBytes(data.year)));
override def columns = Seq("NAME", "TITLE", "YEAR");
};
val conf = new SparkConf().setAppName("HBase Join").setMaster("spark://localhost:7337")
val sc = new SparkContext(conf)
val artistRDD = sc.hbaseTable[Artist]("ARTISTS").inColumnFamily("cf")
val cdRDD = sc.hbaseTable[Cd]("CDS").inColumnFamily("cf")
val artistById = artistRDD.keyBy(f => f.id)
val cdById = cdRDD.keyBy(f => f.artistId)
val artistcd = artistById.join(cdById)
val artistCdRDD = artistcd.map(f => new ArtistCd(f._2._1.id, f._2._2.title, f._2._1.name, f._2._2.year))
artistCdRDD.toHBaseTable("ARTIST_CD").inColumnFamily("cf").save()
System.exit(1)
当我 运行 这样做时,我得到以下异常
16/01/22 14:27:04 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 3, localhost): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2068 actions: ARTIST_CD: 2068 times,
at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:227)
at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access00(AsyncProcess.java:207)
at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1663)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doMutate(BufferedMutatorImpl.java:141)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:98)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:129)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1036)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1042)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1014)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
如果有人有这方面的经验,我将非常感谢你的帮助
我在这里看到了两种解决方案 How to Join two tables in Hbase and how to join tables in hbase 不幸的是,这两种解决方案都不适合我
想通了 - 首先新的 table 需要已经存在。
我原以为 save() 命令会创建它,但没有。
此外,新的 table 必须包含您要保存到的列族 - 这里 "cf"
示例 1)
spark-shell --driver-class-path= {put apache lib path}: {put hbase lib path}
spark-shell --driver-class-path=/usr/local/Cellar/apache-spark/2.4.0/libexec/jars/* :/usr/local/Cellar/hbase-1.4.9/lib/*
示例 2)
spark-shell --driver-class-path=$SPARK_HOME:$(hbase classpath)
我在 HBase 中有两个表需要使用 scala 加入。这些表是使用 sqoop 从 Oracle 导入的,可以在 Hue 数据浏览器中查询
使用 Spark 1.5、Scala 2.10.4。
我正在使用此处的 HBase 数据连接器:https://github.com/nerdammer/spark-hbase-connector
import it.nerdammer.spark.hbase._
import org.apache.hadoop.hbase.client.{ HBaseAdmin, Result }
import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor }
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark._
import it.nerdammer.spark.hbase.conversion.{ FieldReader, FieldWriter }
import org.apache.hadoop.hbase.util.Bytes
case class Artist(id: String,
name: String,
age: Int);
case class Cd(id: String,
artistId: String,
title: String,
year: Int);
case class ArtistCd(id: String,
name: String,
title: String,
year: Int);
implicit def artistReader: FieldReader[Artist] = new FieldReader[Artist] {
override def map(data: HBaseData): Artist = Artist(
id = Bytes.toString(data.head.get),
name = Bytes.toString(data.drop(1).head.get),
age = Bytes.toInt(data.drop(2).head.get));
override def columns = Seq("NAME", "AGE");
};
implicit def cdReader: FieldReader[Cd] = new FieldReader[Cd] {
override def map(data: HBaseData): Cd = Cd(
id = Bytes.toString(data.head.get),
artistId = Bytes.toString(data.drop(1).head.get),
title = Bytes.toString(data.drop(2).head.get),
year = Bytes.toInt(data.drop(3).head.get));
override def columns = Seq("ARTIST_ID", "TITLE", "YEAR");
};
implicit def artistCdWriter: FieldWriter[ArtistCd] = new FieldWriter[ArtistCd] {
override def map(data: ArtistCd): HBaseData =
Seq(
Some(Bytes.toBytes(data.id)),
Some(Bytes.toBytes(data.name)),
Some(Bytes.toBytes(data.title)),
Some(Bytes.toBytes(data.year)));
override def columns = Seq("NAME", "TITLE", "YEAR");
};
val conf = new SparkConf().setAppName("HBase Join").setMaster("spark://localhost:7337")
val sc = new SparkContext(conf)
val artistRDD = sc.hbaseTable[Artist]("ARTISTS").inColumnFamily("cf")
val cdRDD = sc.hbaseTable[Cd]("CDS").inColumnFamily("cf")
val artistById = artistRDD.keyBy(f => f.id)
val cdById = cdRDD.keyBy(f => f.artistId)
val artistcd = artistById.join(cdById)
val artistCdRDD = artistcd.map(f => new ArtistCd(f._2._1.id, f._2._2.title, f._2._1.name, f._2._2.year))
artistCdRDD.toHBaseTable("ARTIST_CD").inColumnFamily("cf").save()
System.exit(1)
当我 运行 这样做时,我得到以下异常
16/01/22 14:27:04 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 3, localhost): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2068 actions: ARTIST_CD: 2068 times,
at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:227)
at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access00(AsyncProcess.java:207)
at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1663)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.doMutate(BufferedMutatorImpl.java:141)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.mutate(BufferedMutatorImpl.java:98)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:129)
at org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:85)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun$$anonfun$apply.apply$mcV$sp(PairRDDFunctions.scala:1036)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun$$anonfun$apply.apply(PairRDDFunctions.scala:1034)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1206)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1042)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsNewAPIHadoopDataset$$anonfun.apply(PairRDDFunctions.scala:1014)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
如果有人有这方面的经验,我将非常感谢你的帮助
我在这里看到了两种解决方案 How to Join two tables in Hbase and how to join tables in hbase 不幸的是,这两种解决方案都不适合我
想通了 - 首先新的 table 需要已经存在。 我原以为 save() 命令会创建它,但没有。 此外,新的 table 必须包含您要保存到的列族 - 这里 "cf"
示例 1)
spark-shell --driver-class-path= {put apache lib path}: {put hbase lib path}
spark-shell --driver-class-path=/usr/local/Cellar/apache-spark/2.4.0/libexec/jars/* :/usr/local/Cellar/hbase-1.4.9/lib/*
示例 2)
spark-shell --driver-class-path=$SPARK_HOME:$(hbase classpath)