使用 LoadIncrementalHFiles 和子目录批量加载
Bulk loading with LoadIncrementalHFiles and subdirectories
我编写了一个 Spark 应用程序,它生成 HFiles 以便稍后使用 LoadIncrementalHFiles
命令进行批量加载。由于源数据池非常大,输入文件被分成一个接一个地处理的迭代。每次迭代都会创建自己的 HFile
目录,因此我的 HDFS 结构如下所示:
/user/myuser/map_data/hfiles_0
... /hfiles_1
... /hfiles_2
... /hfiles_3
...
这个 map_data
目录中大约有 500 个文件,因此我正在寻找一种方法来自动调用 LoadIncrementalHFiles
函数,以便稍后在迭代中处理这些子目录。
相应的命令是这样的:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable
我需要将其更改为迭代命令,因为该命令不适用于子目录(当我使用 /user/myuser/map_data
目录调用它时)!
我尝试使用 Java Process
实例来自动执行上面的命令,但这并没有做任何事情(控制台没有输出,我的 HBase 中也没有更多行table).
在我的代码中使用 org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
Java class 也不起作用,它也没有响应!
有没有人可以为我举个例子?或者是否有一个参数可以在父目录上 运行 上面的 hbase
命令?我在 Hortonworks Data Platform 2.5 集群中使用 HBase 1.1.2。
EDIT 我试图从 Hadoop 客户端 Java 应用程序 运行 LoadIncrementalHFiles
命令,但我得到一个异常相关要快速压缩,请参阅
解决方案是将 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable
命令分成许多部分(每个命令部分一个),请参阅此 Java 代码片段:
TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_PATH), hadoopConf);
for(String hFileDir : subDirs) {
try {
String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
==> String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, hbaseTableName};
ProcessBuilder pb = new ProcessBuilder(execCode);
pb.redirectErrorStream(true);
final Process p = pb.start();
// Write the output of the Process to the console
new Thread(new Runnable() {
public void run() {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = null;
try {
while ((line = input.readLine()) != null)
System.out.println(line);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// Wait for the end of the execution
p.waitFor();
...
}
我编写了一个 Spark 应用程序,它生成 HFiles 以便稍后使用 LoadIncrementalHFiles
命令进行批量加载。由于源数据池非常大,输入文件被分成一个接一个地处理的迭代。每次迭代都会创建自己的 HFile
目录,因此我的 HDFS 结构如下所示:
/user/myuser/map_data/hfiles_0
... /hfiles_1
... /hfiles_2
... /hfiles_3
...
这个 map_data
目录中大约有 500 个文件,因此我正在寻找一种方法来自动调用 LoadIncrementalHFiles
函数,以便稍后在迭代中处理这些子目录。
相应的命令是这样的:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable
我需要将其更改为迭代命令,因为该命令不适用于子目录(当我使用 /user/myuser/map_data
目录调用它时)!
我尝试使用 Java Process
实例来自动执行上面的命令,但这并没有做任何事情(控制台没有输出,我的 HBase 中也没有更多行table).
在我的代码中使用 org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
Java class 也不起作用,它也没有响应!
有没有人可以为我举个例子?或者是否有一个参数可以在父目录上 运行 上面的 hbase
命令?我在 Hortonworks Data Platform 2.5 集群中使用 HBase 1.1.2。
EDIT 我试图从 Hadoop 客户端 Java 应用程序 运行 LoadIncrementalHFiles
命令,但我得到一个异常相关要快速压缩,请参阅
解决方案是将 hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles -Dcreate.table=no /user/myuser/map_data/hfiles_0 mytable
命令分成许多部分(每个命令部分一个),请参阅此 Java 代码片段:
TreeSet<String> subDirs = getHFileDirectories(new Path(HDFS_PATH), hadoopConf);
for(String hFileDir : subDirs) {
try {
String pathToReadFrom = HDFS_OUTPUT_PATH + "/" + hFileDir;
==> String[] execCode = {"hbase", "org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles", "-Dcreate.table=no", pathToReadFrom, hbaseTableName};
ProcessBuilder pb = new ProcessBuilder(execCode);
pb.redirectErrorStream(true);
final Process p = pb.start();
// Write the output of the Process to the console
new Thread(new Runnable() {
public void run() {
BufferedReader input = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line = null;
try {
while ((line = input.readLine()) != null)
System.out.println(line);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
// Wait for the end of the execution
p.waitFor();
...
}