运行 来自 Java 客户端的 LoadIncrementalHFiles
Run LoadIncrementalHFiles from Java client
我想从我的 Java 客户端代码调用 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/myuser/map_data/hfiles mytable
方法。
当我 运行 应用程序时,出现以下异常:
org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file webhdfs://myserver.de:50070/user/myuser/map_data/hfiles/b/b22db8e263b74a7dbd8e36f9ccf16508
at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:477)
at org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:520)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplit(LoadIncrementalHFiles.java:632)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.call(LoadIncrementalHFiles.java:549)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.call(LoadIncrementalHFiles.java:546)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:65)
at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:193)
at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
at org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:327)
at org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:422)
at org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
at org.apache.hadoop.hbase.io.hfile.HFileBlock.unpack(HFileBlock.java:529)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader.nextBlock(HFileBlock.java:1350)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader.nextBlockWithBlockType(HFileBlock.java:1356)
at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.<init>(HFileReaderV2.java:149)
at org.apache.hadoop.hbase.io.hfile.HFileReaderV3.<init>(HFileReaderV3.java:77)
at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:467)
... 8 more
运行 我的 Hadoop 服务器上的控制台上面的 hbase ...
命令运行良好。但是,当我尝试使用 HBase /Hadoop 客户端库从我的 Java 代码中 运行 这些时,它会失败,出现异常!
这里是一段代码:
public static void main(String[] args) {
try {
Configuration conf = loginFromKeyTab("REALM.DE", "server.de", "user", "C:/user.keytab");
conf.set("fs.webhdfs.impl", org.apache.hadoop.hdfs.web.WebHdfsFileSystem.class.getName());
conf.set("hbase.zookeeper.quorum", "server1.de,server2.de,server3.de");
conf.set("zookeeper.znode.parent", "/hbase-secure");
conf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.DE");
conf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.DE");
conf.set("hbase.security.authentication", "kerberos");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("mytable"));
RegionLocator locator = connection.getRegionLocator(table.getName());
Job job = Job.getInstance(conf, "Test Bulk Load");
//HFileOutputFormat2.configureIncrementalLoad(job, table, locator);
//Configuration conf2 = job.getConfiguration();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(HDFS_PATH), connection.getAdmin(), table, locator);
} catch(Exception e) {
e.printStackTrace();
}
}
我的项目需要添加依赖吗?但是如何/在哪里/哪个版本?
我正在使用包含 HBase 1.1.2 和 Hadoop 2.7.3 的 HDP 2.5
我为我的问题找到了另一个解决方案:我正在使用一个 Java 程序运行一个 Process
实例,该实例自动调用 LoadIncrementalHFiles
方法(运行 直接在 Hadoop 节点上),而不是在我的代码中使用 LoadIncrementalHFiles
class 本身!
这里是我的解决方案的代码片段:
TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_OUTPUT_PATH), conf); // The HDFS_OUTPUT_PATH directory contains many HFile sub-directories
for(String hFileDir : subDirs) {
String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, "mytable"}; // Important: Separate each parameter here!!!
ProcessBuilder pb = new ProcessBuilder(execCode);
pb.redirectErrorStream(true);
final Process p = pb.start();
new Thread(new Runnable() {
public void run() {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = null;
try {
while ((line = input.readLine()) != null)
System.out.println(line);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
p.waitFor();
int exitCode = p.exitValue();
System.out.println(" ==> Exit Code: " + exitCode);
}
System.out.println("Finished");
如果有人有其他解决方案(例如如何在代码中直接使用 LoadIncrementalHFiles
class),请告诉我。谢谢!
我想从我的 Java 客户端代码调用 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/myuser/map_data/hfiles mytable
方法。
当我 运行 应用程序时,出现以下异常:
org.apache.hadoop.hbase.io.hfile.CorruptHFileException: Problem reading HFile Trailer from file webhdfs://myserver.de:50070/user/myuser/map_data/hfiles/b/b22db8e263b74a7dbd8e36f9ccf16508
at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:477)
at org.apache.hadoop.hbase.io.hfile.HFile.createReader(HFile.java:520)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.groupOrSplit(LoadIncrementalHFiles.java:632)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.call(LoadIncrementalHFiles.java:549)
at org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.call(LoadIncrementalHFiles.java:546)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.
at org.apache.hadoop.io.compress.SnappyCodec.checkNativeCodeLoaded(SnappyCodec.java:65)
at org.apache.hadoop.io.compress.SnappyCodec.getDecompressorType(SnappyCodec.java:193)
at org.apache.hadoop.io.compress.CodecPool.getDecompressor(CodecPool.java:178)
at org.apache.hadoop.hbase.io.compress.Compression$Algorithm.getDecompressor(Compression.java:327)
at org.apache.hadoop.hbase.io.compress.Compression.decompress(Compression.java:422)
at org.apache.hadoop.hbase.io.encoding.HFileBlockDefaultDecodingContext.prepareDecoding(HFileBlockDefaultDecodingContext.java:90)
at org.apache.hadoop.hbase.io.hfile.HFileBlock.unpack(HFileBlock.java:529)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader.nextBlock(HFileBlock.java:1350)
at org.apache.hadoop.hbase.io.hfile.HFileBlock$AbstractFSReader.nextBlockWithBlockType(HFileBlock.java:1356)
at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.<init>(HFileReaderV2.java:149)
at org.apache.hadoop.hbase.io.hfile.HFileReaderV3.<init>(HFileReaderV3.java:77)
at org.apache.hadoop.hbase.io.hfile.HFile.pickReaderVersion(HFile.java:467)
... 8 more
运行 我的 Hadoop 服务器上的控制台上面的 hbase ...
命令运行良好。但是,当我尝试使用 HBase /Hadoop 客户端库从我的 Java 代码中 运行 这些时,它会失败,出现异常!
这里是一段代码:
public static void main(String[] args) {
try {
Configuration conf = loginFromKeyTab("REALM.DE", "server.de", "user", "C:/user.keytab");
conf.set("fs.webhdfs.impl", org.apache.hadoop.hdfs.web.WebHdfsFileSystem.class.getName());
conf.set("hbase.zookeeper.quorum", "server1.de,server2.de,server3.de");
conf.set("zookeeper.znode.parent", "/hbase-secure");
conf.set("hbase.master.kerberos.principal", "hbase/_HOST@REALM.DE");
conf.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@REALM.DE");
conf.set("hbase.security.authentication", "kerberos");
Connection connection = ConnectionFactory.createConnection(conf);
Table table = connection.getTable(TableName.valueOf("mytable"));
RegionLocator locator = connection.getRegionLocator(table.getName());
Job job = Job.getInstance(conf, "Test Bulk Load");
//HFileOutputFormat2.configureIncrementalLoad(job, table, locator);
//Configuration conf2 = job.getConfiguration();
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
loader.doBulkLoad(new Path(HDFS_PATH), connection.getAdmin(), table, locator);
} catch(Exception e) {
e.printStackTrace();
}
}
我的项目需要添加依赖吗?但是如何/在哪里/哪个版本?
我正在使用包含 HBase 1.1.2 和 Hadoop 2.7.3 的 HDP 2.5
我为我的问题找到了另一个解决方案:我正在使用一个 Java 程序运行一个 Process
实例,该实例自动调用 LoadIncrementalHFiles
方法(运行 直接在 Hadoop 节点上),而不是在我的代码中使用 LoadIncrementalHFiles
class 本身!
这里是我的解决方案的代码片段:
TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_OUTPUT_PATH), conf); // The HDFS_OUTPUT_PATH directory contains many HFile sub-directories
for(String hFileDir : subDirs) {
String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, "mytable"}; // Important: Separate each parameter here!!!
ProcessBuilder pb = new ProcessBuilder(execCode);
pb.redirectErrorStream(true);
final Process p = pb.start();
new Thread(new Runnable() {
public void run() {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = null;
try {
while ((line = input.readLine()) != null)
System.out.println(line);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
p.waitFor();
int exitCode = p.exitValue();
System.out.println(" ==> Exit Code: " + exitCode);
}
System.out.println("Finished");
如果有人有其他解决方案(例如如何在代码中直接使用 LoadIncrementalHFiles
class),请告诉我。谢谢!