Sqoop 使用 Java API 导入到 Hbase 表

Sqoop using Java API to import to Hbase tables

我一直在尝试使用 Sqoop 将数据从 MySQL 数据库导入到 Hbase,但是 运行 始终出错。请问你能帮我吗? (我正在使用 Sqoop 1)

我的代码如下:

import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.SqoopTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.sqoop.Sqoop;


public class SqoopScheduler {

    Logger log = Logger.getLogger(SqoopScheduler.class);

    private static Configuration configuration = null;
    private static SqoopOptions SqoopOptions = new SqoopOptions();
    private static final String driver = "com.mysql.jdbc.Driver";
    private static final String connectionString = "jdbc:mysql://jira.com:3306/jirarepository";
    private static final String username = "jiraadmin";
    private static final String password = "jiraadmin";
    private static final String splitBy = "issue_id";
    private static final int Counter = 21000;
    private static final String queryString = "select * from issues where issue_id < ";

    private static void setUp() {



        SqoopOptions.setJobName("HBase_SequentialImport");
        SqoopOptions.setMapreduceJobName("HBase_SequentialImport");
        SqoopOptions.setDriverClassName(driver);
        SqoopOptions.setConnectString(connectionString);
        SqoopOptions.setUsername(username);
        SqoopOptions.setPassword(password);
        SqoopOptions.setSplitByCol(splitBy);
        SqoopOptions.setSqlQuery(queryString + Counter + " and $CONDITIONS");
        SqoopOptions.setHBaseBulkLoadEnabled(true);

        SqoopOptions.setHBaseTable("jira_issues");
        SqoopOptions.setHBaseColFamily("issue_detail");
        SqoopOptions.setHBaseRowKeyColumn(splitBy);

    }

    private static int runIt() {
        int res;
        res = new ImportTool().run(SqoopOptions);
        if (res != 0) {
            throw new RuntimeException("Sqoop API Failed - return code : "+ Integer.toString(res));
        }
        return res;
    }




    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        setUp();
        int result = runIt();
        System.out.println(result);

    }
}

我 运行 遇到的错误是:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.JavaMain], main() threw exception, java.lang.NullPointerException
org.apache.oozie.action.hadoop.JavaMainException: java.lang.NullPointerException
    at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:60)
    at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46)
    at org.apache.oozie.action.hadoop.JavaMain.main(JavaMain.java:38)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access0(LocalContainerLauncher.java:181)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.run(LocalContainerLauncher.java:224)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.fs.FileSystem.fixRelativePart(FileSystem.java:2147)
    at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:633)
    at org.apache.sqoop.mapreduce.HBaseBulkImportJob.jobTeardown(HBaseBulkImportJob.java:124)
    at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:282)
    at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:724)
    at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:499)
    at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
    at SqoopScheduler.runIt(SqoopScheduler.java:61)
    at SqoopScheduler.main(SqoopScheduler.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:57)
    ... 19 more

更改where子句后以$CONDITIONS开头的queryString

 private static final String queryString = "select * from issues WHERE $CONDITIONS AND issue_id < ";



SqoopOptions.setSqlQuery(queryString + Counter);

经过几次尝试后发现错误是由于 MySQL map reduce 工作方式的缺陷造成的。它试图在失败的地方设置内部 sqoop map reduce 作业的获取大小。

回答这个问题,以便任何其他被这个问题困扰的人都可以轻松继续。

您在这里所要做的就是在 SqoopOptions 中指定一个显式的获取大小,例如:

private static SqoopOptions SqoopOptions = new SqoopOptions();
  SqoopOptions.setFetchSize(2000);

然后应该就可以正常工作了。