我一直在尝试使用sqoop将数据从mysql数据库导入hbase,但是遇到了一个错误。你能帮我一下吗(我正在使用sqoop 1)
我的代码是:
import com.cloudera.sqoop.SqoopOptions;
import com.cloudera.sqoop.tool.ImportTool;
import com.cloudera.sqoop.SqoopOptions.IncrementalMode;
import com.cloudera.sqoop.tool.SqoopTool;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.sqoop.Sqoop;
public class SqoopScheduler {
Logger log = Logger.getLogger(SqoopScheduler.class);
private static Configuration configuration = null;
private static SqoopOptions SqoopOptions = new SqoopOptions();
private static final String driver = "com.mysql.jdbc.Driver";
private static final String connectionString = "jdbc:mysql://jira.com:3306/jirarepository";
private static final String username = "jiraadmin";
private static final String password = "jiraadmin";
private static final String splitBy = "issue_id";
private static final int Counter = 21000;
private static final String queryString = "select * from issues where issue_id < ";
private static void setUp() {
SqoopOptions.setJobName("HBase_SequentialImport");
SqoopOptions.setMapreduceJobName("HBase_SequentialImport");
SqoopOptions.setDriverClassName(driver);
SqoopOptions.setConnectString(connectionString);
SqoopOptions.setUsername(username);
SqoopOptions.setPassword(password);
SqoopOptions.setSplitByCol(splitBy);
SqoopOptions.setSqlQuery(queryString + Counter + " and $CONDITIONS");
SqoopOptions.setHBaseBulkLoadEnabled(true);
SqoopOptions.setHBaseTable("jira_issues");
SqoopOptions.setHBaseColFamily("issue_detail");
SqoopOptions.setHBaseRowKeyColumn(splitBy);
}
private static int runIt() {
int res;
res = new ImportTool().run(SqoopOptions);
if (res != 0) {
throw new RuntimeException("Sqoop API Failed - return code : "+ Integer.toString(res));
}
return res;
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
setUp();
int result = runIt();
System.out.println(result);
}
}
我遇到的错误如下:
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.JavaMain], main() threw exception, java.lang.NullPointerException
org.apache.oozie.action.hadoop.JavaMainException: java.lang.NullPointerException
at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:60)
at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46)
at org.apache.oozie.action.hadoop.JavaMain.main(JavaMain.java:38)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access$200(LocalContainerLauncher.java:181)
at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler$1.run(LocalContainerLauncher.java:224)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.fs.FileSystem.fixRelativePart(FileSystem.java:2147)
at org.apache.hadoop.hdfs.DistributedFileSystem.delete(DistributedFileSystem.java:633)
at org.apache.sqoop.mapreduce.HBaseBulkImportJob.jobTeardown(HBaseBulkImportJob.java:124)
at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:282)
at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:724)
at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:499)
at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605)
at SqoopScheduler.runIt(SqoopScheduler.java:61)
at SqoopScheduler.main(SqoopScheduler.java:75)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:57)
... 19 more
2条答案
按热度按时间50few1ms1#
更改以$conditions after where子句开头的querystring
c9x0cxw02#
经过几次尝试后,发现错误是由于mysql map reduce工作方式中的缺陷造成的。它试图在内部设置sqoop map reduce作业失败时的获取大小。
回答这个问题,这样任何一个被困在这个问题上的人都可以轻松地前进。
在这里,您所要做的就是在sqoop选项中指定一个显式的fetch大小,如:
然后它就可以正常工作了。