具有多个数据源的两个Map器

0qx6xfy6  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(482)

我写了两个Map器map1和map2
map1-读取hdfs中的seq文件并对其进行处理。
map2—从hbase读取数据并生成与map1相同的键、值对。
最后我把它们合并到了reduceral中。
问题是只有一个Map器正在运行,并且作业完成时没有任何类型的错误。只有最后一个Map器正在运行(即 TableMapReduceUtil ). 如果我交换线路 TableMapReduceUtil 以及 MultipleInputs ,然后是最后一个。 MultipleInputs Map器运行。
我做错什么了?两种方案都不会引发错误。我还阅读了2个文件使用 addCacheFile() 但我想这并不重要。

Job job3 = Job.getInstance(config, "Test");
if (true) {

  job3.setJarByClass(Main.class);

  job3.setMapOutputKeyClass(ImmutableBytesWritable.class);
  job3.setMapOutputValueClass(ImmutableBytesWritable.class);
  job3.setOutputKeyClass(ImmutableBytesWritable.class);
  job3.setOutputValueClass(ImmutableBytesWritable.class);

  job3.getConfiguration().set("StartDate", c_startDate);
  job3.getConfiguration().set("EndDate", c_endDate);

  job3.addCacheFile(new URI(args[8]));
  job3.getConfiguration().set("abc", args[8].substring(args[8].lastIndexOf("/") + 1));

  job3.addCacheFile(new URI(args[9]));
  job3.getConfiguration().set("xyz", args[9].substring(args[9].lastIndexOf("/") + 1));
  job3.setReducerClass(ReducerAll.class);
  job3.setOutputFormatClass(SequenceFileOutputFormat.class);

  job3.setNumReduceTasks(10);

  Scan scan = new Scan();
  scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("hbasetable"));
  scan.setCaching(300);
  scan.setCacheBlocks(false);

  MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
  TableMapReduceUtil.initTableMapperJob(
          "hbasetable",
          scan,
          Map2.class,
          ImmutableBytesWritable.class,
          ImmutableBytesWritable.class,
          job3);

  FileOutputFormat.setOutputPath(job3, new Path(args[7]));
  job3.waitForCompletion(true);
  if (!job3.waitForCompletion(true)) {
    return (1);
  }
lskq00tm

lskq00tm1#

我相信这种行为是由于以下两个原因lines:-

MultipleInputs.addInputPath(job3, new Path(args[6]), SequenceFileInputFormat.class, Map1.class);
 TableMapReduceUtil.initTableMapperJob(
      "hbasetable",
      scan,
      Map2.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job3);

只有一个工作3
尽管您已经提到有两个Map器,但请查看Map器类型。Map绘制者 Map1 会不同于 Map2 . Map1 是一个 Mapper ,和 Map2 是一个 TableMapper 将这两条语句放在一起并不意味着它们基本上都是在job3的multipleinputs设置中使用的。对于map1,multipleinputs仍然只有一个设置。map2的另一个设置仍然是独立的。
现在开始执行死刑。两种配置multipleinputs或tablemapreduceutil中的后一种会覆盖job3中的前一种配置,因此只执行一个Map器
请让我知道,如果这是不正确的,我还没有验证我的理解,我在我的机器这里提出。

相关问题