Sqoop 使用 Java API 导入到 Hbase 表
Sqoop using Java API to import to Hbase tables
我一直在尝试使用 Sqoop 将数据从 MySQL 数据库导入到 Hbase,但是 运行 始终出错。请问你能帮我吗? (我正在使用 Sqoop 1)
我的代码如下:
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.SqoopTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.sqoop.Sqoop;
public class SqoopScheduler {
Logger log = Logger.getLogger(SqoopScheduler.class);
private static Configuration configuration = null;
private static SqoopOptions SqoopOptions = new SqoopOptions();
private static final String driver = "com.mysql.jdbc.Driver";
private static final String connectionString = "jdbc:mysql://jira.com:3306/jirarepository";
private static final String username = "jiraadmin";
private static final String password = "jiraadmin";
private static final String splitBy = "issue_id";
private static final int Counter = 21000;
private static final String queryString = "select * from issues where issue_id < ";
private static void setUp() {
SqoopOptions.setJobName("HBase_SequentialImport");
SqoopOptions.setMapreduceJobName("HBase_SequentialImport");
SqoopOptions.setDriverClassName(driver);
SqoopOptions.setConnectString(connectionString);
SqoopOptions.setUsername(username);
SqoopOptions.setPassword(password);
SqoopOptions.setSplitByCol(splitBy);
SqoopOptions.setSqlQuery(queryString + Counter + " and $CONDITIONS");
SqoopOptions.setHBaseBulkLoadEnabled(true);
SqoopOptions.setHBaseTable("jira_issues");
SqoopOptions.setHBaseColFamily("issue_detail");
SqoopOptions.setHBaseRowKeyColumn(splitBy);
}
private static int runIt() {
int res;
res = new ImportTool().run(SqoopOptions);
if (res != 0) {
throw new RuntimeException("Sqoop API Failed - return code : "+ Integer.toString(res));
}
return res;
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
setUp();
int result = runIt();
System.out.println(result);
}
}
我 运行 遇到的错误是:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.JavaMain], main() threw exception, java.lang.NullPointerException
org.apache.oozie.action.hadoop.JavaMainException: java.lang.NullPointerException
at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:60)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46)
at org.apache.oozie.action.hadoop.JavaMain.main(JavaMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access0(LocalContainerLauncher.java:181)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.run(LocalContainerLauncher.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.fixRelativePart(FileSystem.java:2147)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:633)
at org.apache.sqoop.mapreduce.HBaseBulkImportJob.jobTeardown(HBaseBulkImportJob.java:124)
at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:282)
at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:724)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:499)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
at SqoopScheduler.runIt(SqoopScheduler.java:61)
at SqoopScheduler.main(SqoopScheduler.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:57)
... 19 more
更改where子句后以$CONDITIONS开头的queryString
private static final String queryString = "select * from issues WHERE $CONDITIONS AND issue_id < ";
SqoopOptions.setSqlQuery(queryString + Counter);
经过几次尝试后发现错误是由于 MySQL map reduce 工作方式的缺陷造成的。它试图在失败的地方设置内部 sqoop map reduce 作业的获取大小。
回答这个问题,以便任何其他被这个问题困扰的人都可以轻松继续。
您在这里所要做的就是在 SqoopOptions 中指定一个显式的获取大小,例如:
private static SqoopOptions SqoopOptions = new SqoopOptions();
SqoopOptions.setFetchSize(2000);
然后应该就可以正常工作了。
我一直在尝试使用 Sqoop 将数据从 MySQL 数据库导入到 Hbase,但是 运行 始终出错。请问你能帮我吗? (我正在使用 Sqoop 1)
我的代码如下:
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.SqoopTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.sqoop.Sqoop;
public class SqoopScheduler {
Logger log = Logger.getLogger(SqoopScheduler.class);
private static Configuration configuration = null;
private static SqoopOptions SqoopOptions = new SqoopOptions();
private static final String driver = "com.mysql.jdbc.Driver";
private static final String connectionString = "jdbc:mysql://jira.com:3306/jirarepository";
private static final String username = "jiraadmin";
private static final String password = "jiraadmin";
private static final String splitBy = "issue_id";
private static final int Counter = 21000;
private static final String queryString = "select * from issues where issue_id < ";
private static void setUp() {
SqoopOptions.setJobName("HBase_SequentialImport");
SqoopOptions.setMapreduceJobName("HBase_SequentialImport");
SqoopOptions.setDriverClassName(driver);
SqoopOptions.setConnectString(connectionString);
SqoopOptions.setUsername(username);
SqoopOptions.setPassword(password);
SqoopOptions.setSplitByCol(splitBy);
SqoopOptions.setSqlQuery(queryString + Counter + " and $CONDITIONS");
SqoopOptions.setHBaseBulkLoadEnabled(true);
SqoopOptions.setHBaseTable("jira_issues");
SqoopOptions.setHBaseColFamily("issue_detail");
SqoopOptions.setHBaseRowKeyColumn(splitBy);
}
private static int runIt() {
int res;
res = new ImportTool().run(SqoopOptions);
if (res != 0) {
throw new RuntimeException("Sqoop API Failed - return code : "+ Integer.toString(res));
}
return res;
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
setUp();
int result = runIt();
System.out.println(result);
}
}
我 运行 遇到的错误是:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.JavaMain], main() threw exception, java.lang.NullPointerException
org.apache.oozie.action.hadoop.JavaMainException: java.lang.NullPointerException
at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:60)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46)
at org.apache.oozie.action.hadoop.JavaMain.main(JavaMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access0(LocalContainerLauncher.java:181)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.run(LocalContainerLauncher.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.fixRelativePart(FileSystem.java:2147)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:633)
at org.apache.sqoop.mapreduce.HBaseBulkImportJob.jobTeardown(HBaseBulkImportJob.java:124)
at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:282)
at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:724)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:499)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
at SqoopScheduler.runIt(SqoopScheduler.java:61)
at SqoopScheduler.main(SqoopScheduler.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:57)
... 19 more
更改where子句后以$CONDITIONS开头的queryString
private static final String queryString = "select * from issues WHERE $CONDITIONS AND issue_id < ";
SqoopOptions.setSqlQuery(queryString + Counter);
经过几次尝试后发现错误是由于 MySQL map reduce 工作方式的缺陷造成的。它试图在失败的地方设置内部 sqoop map reduce 作业的获取大小。
回答这个问题,以便任何其他被这个问题困扰的人都可以轻松继续。
您在这里所要做的就是在 SqoopOptions 中指定一个显式的获取大小,例如:
private static SqoopOptions SqoopOptions = new SqoopOptions();
SqoopOptions.setFetchSize(2000);
然后应该就可以正常工作了。