mapreduce作业到hbase抛出ioexception:传递delete或put

ct2axkht  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(253)

在emr上使用hadoop2.4.0和hbase0.94.18时,我试图直接从Map器输出到hbase表。
我有点恶心 IOException: Pass a Delete or a Put 当执行下面的代码时。

public class TestHBase {
  static class ImportMapper 
            extends Mapper<MyKey, MyValue, ImmutableBytesWritable, Writable> {
    private byte[] family = Bytes.toBytes("f");

    @Override
    public void map(MyKey key, MyValue value, Context context) {
      MyItem item = //do some stuff with key/value and create item
      byte[] rowKey = Bytes.toBytes(item.getKey());
      Put put = new Put(rowKey);
      for (String attr : Arrays.asList("a1", "a2", "a3")) {
        byte[] qualifier = Bytes.toBytes(attr);
        put.add(family, qualifier, Bytes.toBytes(item.get(attr)));
      }
      context.write(new ImmutableBytesWritable(rowKey), put);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    String input = args[0];
    String table = "table";
    Job job = Job.getInstance(conf, "stuff");

    job.setJarByClass(ImportMapper.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    FileInputFormat.setInputDirRecursive(job, true);
    FileInputFormat.addInputPath(job, new Path(input));

    TableMapReduceUtil.initTableReducerJob(
            table,                  // output table
            null,                   // reducer class
            job);
    job.setNumReduceTasks(0);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

有人知道我做错了什么吗?
堆栈跟踪
错误:java.io.ioexception:在org.apache.hadoop.hbase.mapreduce.tableoutputformat$tablerecordwriter.write(tableoutputformat)传递delete或put。java:125)位于org.apache.hadoop.hbase.mapreduce.tableoutputformat$tablerecordwriter.write(tableoutputformat)。java:84)在org.apache.hadoop.mapred.maptask$newdirectoutputcollector.write(maptask。java:646) 在org.apache.hadoop.mapreduce.task.taskInputInputContextImpl.write(taskInputInputContextImpl。java:89)在org.apache.hadoop.mapreduce.lib.map.wrappedmapper$context.write(wrappedmapper。java:112)在org.apache.hadoop.mapreduce.mapper.map(mapper。java:124)在org.apache.hadoop.mapreduce.mapper.run(mapper。java:145)在org.apache.hadoop.mapred.maptask.runnewmapper(maptask。java:775)在org.apache.hadoop.mapred.maptask.run(maptask。java:341)在org.apache.hadoop.mapred.yarnchild$2.run(yarnchild。java:167)位于javax.security.auth.subject.doas(subject)的java.security.accesscontroller.doprivileged(本机方法)。java:415)在org.apache.hadoop.security.usergroupinformation.doas(用户组信息。java:1548)在org.apache.hadoop.mapred.yarnchild.main(yarnchild。java:162)容器被应用程序管理员杀死。按要求杀死集装箱。出口代码为143,集装箱出口代码为非零143

tzdcorbm

tzdcorbm1#

最好能显示完整的堆栈跟踪,这样我就可以帮助您轻松地解决它。我没有执行你的代码。据我所知,这可能是问题所在 job.setNumReduceTasks(0); Map绘制者将期待您的 put 对象直接写入apache hbase。你可以增加 setNumReduceTasks 或者,如果您看到api,您可以找到它的默认值并对其进行注解。

kq4fsx7k

kq4fsx7k2#

感谢您添加堆栈跟踪。不幸的是,您没有包含引发异常的代码,因此我无法为您完全跟踪它。相反,我做了一些搜索,发现了一些东西给你。
您的堆栈跟踪与另一个堆栈跟踪相似,所以这里的问题是:在hbase mapreduce中传递delete或put错误
那个人通过发表评论解决了问题 job.setNumReduceTasks(0); 有一个类似的so问题,有同样的例外,但不能这样解决问题。相反,它在注解方面遇到了问题:
读取hdfs和存储hbase时,“java.io.ioexception:传递delete或put”
下面是一些很好的示例,说明如何使用setnumreducetasks在0和1或更多位置编写工作代码。
"51.2. hbase mapreduce读/写示例下面是一个将hbase用作mapreduce的源和接收器的示例。本例将简单地将数据从一个表复制到另一个表。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleReadWrite");
job.setJarByClass(MyReadWriteJob.class);    // class that contains mapper

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,      // input table
  scan,             // Scan instance to control CF and attribute selection
  MyMapper.class,   // mapper class
  null,             // mapper output key
  null,             // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,      // output table
  null,             // reducer class
  job);
job.setNumReduceTasks(0);

boolean b = job.waitForCompletion(true);
if (!b) {
    throw new IOException("error with job!");
}

这是一个或多个示例:
"51.4. hbase mapreduce summary to hbase example以下示例使用hbase作为带有摘要步骤的mapreduce源和汇。此示例将统计表中某个值的不同示例数,并将这些汇总的计数写入另一个表中。

Configuration config = HBaseConfiguration.create();
Job job = new Job(config,"ExampleSummary");
job.setJarByClass(MySummaryJob.class);     // class that contains mapper and reducer

Scan scan = new Scan();
scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
scan.setCacheBlocks(false);  // don't set to true for MR jobs
// set other scan attrs

TableMapReduceUtil.initTableMapperJob(
  sourceTable,        // input table
  scan,               // Scan instance to control CF and attribute selection
  MyMapper.class,     // mapper class
  Text.class,         // mapper output key
  IntWritable.class,  // mapper output value
  job);
TableMapReduceUtil.initTableReducerJob(
  targetTable,        // output table
  MyTableReducer.class,    // reducer class
  job);
job.setNumReduceTasks(1);   // at least one, adjust as required

boolean b = job.waitForCompletion(true);
if (!b) {
  throw new IOException("error with job!");
}

http://hbase.apache.org/book.html#mapreduce.example
你似乎更接近第一个例子。我想说明,有时有理由将reduce任务数设置为零。

相关问题