sqoop使用javaapi导入hbase表

gzjq41n4  于 2021-06-02  发布在  Hadoop
关注(0)|答案(2)|浏览(427)

我一直在尝试使用sqoop将数据从mysql数据库导入hbase,但是遇到了一个错误。你能帮我一下吗(我正在使用sqoop 1)
我的代码是:

import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.SqoopTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.sqoop.Sqoop;

public class SqoopScheduler {

    Logger log = Logger.getLogger(SqoopScheduler.class);

    private static Configuration configuration = null;
    private static SqoopOptions SqoopOptions = new SqoopOptions();
    private static final String driver = "com.mysql.jdbc.Driver";
    private static final String connectionString = "jdbc:mysql://jira.com:3306/jirarepository";
    private static final String username = "jiraadmin";
    private static final String password = "jiraadmin";
    private static final String splitBy = "issue_id";
    private static final int Counter = 21000;
    private static final String queryString = "select * from issues where issue_id < ";

    private static void setUp() {

        SqoopOptions.setJobName("HBase_SequentialImport");
        SqoopOptions.setMapreduceJobName("HBase_SequentialImport");
        SqoopOptions.setDriverClassName(driver);
        SqoopOptions.setConnectString(connectionString);
        SqoopOptions.setUsername(username);
        SqoopOptions.setPassword(password);
        SqoopOptions.setSplitByCol(splitBy);
        SqoopOptions.setSqlQuery(queryString + Counter + " and $CONDITIONS");
        SqoopOptions.setHBaseBulkLoadEnabled(true);

        SqoopOptions.setHBaseTable("jira_issues");
        SqoopOptions.setHBaseColFamily("issue_detail");
        SqoopOptions.setHBaseRowKeyColumn(splitBy);

    }

    private static int runIt() {
        int res;
        res = new ImportTool().run(SqoopOptions);
        if (res != 0) {
            throw new RuntimeException("Sqoop API Failed - return code : "+ Integer.toString(res));
        }
        return res;
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        setUp();
        int result = runIt();
        System.out.println(result);

    }
}

我遇到的错误如下:

Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.JavaMain], main() threw exception, java.lang.NullPointerException
org.apache.oozie.action.hadoop.JavaMainException: java.lang.NullPointerException
    at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:60)
    at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46)
    at org.apache.oozie.action.hadoop.JavaMain.main(JavaMain.java:38)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access$200(LocalContainerLauncher.java:181)
    at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler$1.run(LocalContainerLauncher.java:224)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at org.apache.hadoop.fs.FileSystem.fixRelativePart(FileSystem.java:2147)
    at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:633)
    at org.apache.sqoop.mapreduce.HBaseBulkImportJob.jobTeardown(HBaseBulkImportJob.java:124)
    at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:282)
    at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:724)
    at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:499)
    at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
    at SqoopScheduler.runIt(SqoopScheduler.java:61)
    at SqoopScheduler.main(SqoopScheduler.java:75)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:57)
    ... 19 more
50few1ms

50few1ms1#

更改以$conditions after where子句开头的querystring

private static final String queryString = "select * from issues WHERE \$CONDITIONS AND issue_id < ";

SqoopOptions.setSqlQuery(queryString + Counter);
c9x0cxw0

c9x0cxw02#

经过几次尝试后,发现错误是由于mysql map reduce工作方式中的缺陷造成的。它试图在内部设置sqoop map reduce作业失败时的获取大小。
回答这个问题,这样任何一个被困在这个问题上的人都可以轻松地前进。
在这里,您所要做的就是在sqoop选项中指定一个显式的fetch大小,如:

private static SqoopOptions SqoopOptions = new SqoopOptions();
  SqoopOptions.setFetchSize(2000);

然后它就可以正常工作了。

相关问题