Spark 和 HBase 快照
Spark and HBase Snapshots
如果直接从 HDFS 而不是使用 HBase API,我们可以更快地访问数据,我们正在尝试基于来自 HBase 的 Table 快照构建 RDD .
所以,我有一个名为 "dm_test_snap" 的快照。我似乎能够使大部分配置工作正常,但我的 RDD 为空(尽管快照本身中有数据)。
我很难找到任何人使用 Spark 对 HBase 快照进行离线分析的示例,但我不敢相信只有我一个人在尝试使它正常工作。非常感谢任何帮助或建议。
这是我的代码片段:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
val scan = new Scan
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}
更新以包含解决方案
诀窍是,正如@Holden 在下面提到的,conf 没有通过。为了解决这个问题,我能够通过将对 newAPIHadoopRDD 的调用更改为这样来使其工作:
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
@victor 的回答也强调了第二个问题,即我没有通过扫描。为了解决这个问题,我添加了这一行和方法:
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
这也让我从 conf.set 命令中提取了这一行:
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
*注意:这是针对 CDH5.0 上的 HBase 版本 0.96.1.1
便于参考的最终完整代码:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
val scan = new Scan
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
}
查看作业信息,它会复制您提供给它的 conf 对象 (The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.
),因此很可能您需要在 conf 对象上设置的信息没有被传递下降到星火。您可以改用 TableSnapshotInputFormatImpl
,它具有适用于 conf 对象的类似方法。可能还需要额外的东西,但首先通过这个问题,这似乎是最可能的原因。
正如评论中所指出的,另一种选择是使用 job.getConfiguration
从作业对象中获取更新的配置。
您没有正确配置您的 M/R 作业:
这是 Java 中关于如何通过快照配置 M/R 的示例:
Job job = new Job(conf);
Scan scan = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, MyTableMapper.class, MyMapKeyOutput.class,
MyMapOutputValueWritable.class, job, true);
}
您肯定跳过了扫描。我建议您查看 TableMapReduceUtil 的 initTableSnapshotMapperJob 实现,了解如何在 Spark/Scala.
中配置作业
这是 mapreduce 中的完整配置 Java
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, // Name of the snapshot
scan, // Scan instance to control CF and attribute selection
DefaultMapper.class, // mapper class
NullWritable.class, // mapper output key
Text.class, // mapper output value
job,
true,
restoreDir);
如果直接从 HDFS 而不是使用 HBase API,我们可以更快地访问数据,我们正在尝试基于来自 HBase 的 Table 快照构建 RDD .
所以,我有一个名为 "dm_test_snap" 的快照。我似乎能够使大部分配置工作正常,但我的 RDD 为空(尽管快照本身中有数据)。
我很难找到任何人使用 Spark 对 HBase 快照进行离线分析的示例,但我不敢相信只有我一个人在尝试使它正常工作。非常感谢任何帮助或建议。
这是我的代码片段:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
val scan = new Scan
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
}
更新以包含解决方案 诀窍是,正如@Holden 在下面提到的,conf 没有通过。为了解决这个问题,我能够通过将对 newAPIHadoopRDD 的调用更改为这样来使其工作:
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
@victor 的回答也强调了第二个问题,即我没有通过扫描。为了解决这个问题,我添加了这一行和方法:
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
这也让我从 conf.set 命令中提取了这一行:
conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "dm_test_snap")
*注意:这是针对 CDH5.0 上的 HBase 版本 0.96.1.1
便于参考的最终完整代码:
object TestSnap {
def main(args: Array[String]) {
val config = ConfigFactory.load()
val hbaseRootDir = config.getString("hbase.rootdir")
val sparkConf = new SparkConf()
.setAppName("testnsnap")
.setMaster(config.getString("spark.app.master"))
.setJars(SparkContext.jarOfObject(this))
.set("spark.executor.memory", "2g")
.set("spark.default.parallelism", "160")
val sc = new SparkContext(sparkConf)
println("Creating hbase configuration")
val conf = HBaseConfiguration.create()
conf.set("hbase.rootdir", hbaseRootDir)
conf.set("hbase.zookeeper.quorum", config.getString("hbase.zookeeper.quorum"))
conf.set("zookeeper.session.timeout", config.getString("zookeeper.session.timeout"))
val scan = new Scan
conf.set(TableInputFormat.SCAN, convertScanToString(scan))
val job = Job.getInstance(conf)
TableSnapshotInputFormat.setInput(job, "dm_test_snap",
new Path("hdfs://nameservice1/tmp"))
val hBaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hBaseRDD.count()
System.exit(0)
}
def convertScanToString(scan : Scan) = {
val proto = ProtobufUtil.toScan(scan);
Base64.encodeBytes(proto.toByteArray());
}
}
查看作业信息,它会复制您提供给它的 conf 对象 (The Job makes a copy of the Configuration so that any necessary internal modifications do not reflect on the incoming parameter.
),因此很可能您需要在 conf 对象上设置的信息没有被传递下降到星火。您可以改用 TableSnapshotInputFormatImpl
,它具有适用于 conf 对象的类似方法。可能还需要额外的东西,但首先通过这个问题,这似乎是最可能的原因。
正如评论中所指出的,另一种选择是使用 job.getConfiguration
从作业对象中获取更新的配置。
您没有正确配置您的 M/R 作业: 这是 Java 中关于如何通过快照配置 M/R 的示例:
Job job = new Job(conf);
Scan scan = new Scan();
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName,
scan, MyTableMapper.class, MyMapKeyOutput.class,
MyMapOutputValueWritable.class, job, true);
}
您肯定跳过了扫描。我建议您查看 TableMapReduceUtil 的 initTableSnapshotMapperJob 实现,了解如何在 Spark/Scala.
中配置作业这是 mapreduce 中的完整配置 Java
TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, // Name of the snapshot
scan, // Scan instance to control CF and attribute selection
DefaultMapper.class, // mapper class
NullWritable.class, // mapper output key
Text.class, // mapper output value
job,
true,
restoreDir);