java—一个mapreduce程序的输出,作为另一个mapreduce程序的输入

lpwwtiir  于 2021-05-30  发布在  Hadoop
关注(0)|答案(2)|浏览(350)

我正在尝试一个简单的示例,其中一个mapreduce作业的输出应该是另一个mapreduce作业的输入。
流程应该是这样的: Mapper1 --> Reducer1 --> Mapper2 --> Reducer2 (mapper1的输出必须是reducer1的输入。reducer1的输出必须是mapper2的输入。mapper2的输出必须是reducer2的输入。reducer2的输出必须存储在输出文件中)。
如何将多个Map器和还原器添加到我的程序中,使流保持如上所示?
我需要使用链Map器或链减速器吗?如果有的话,我怎么用?

a0x5cqrl

a0x5cqrl1#

我想你要找的是有控制的工作和有控制的工作。这正好符合你的目的。在单个驱动程序类中,可以构建相互依赖的多个作业。下面的代码可能会帮助您理解。

Job jobOne = Job(jobOneConf, "Job-1");
    FileInputFormat.addInputPath(jobOne, jobOneInput);
    FileOutputFormat.setOutputPath(jobOne, jobOneOutput);
    ControlledJob jobOneControl = new ControlledJob(jobOneConf);
    jobOneControl.setJob(jobOne);

    Job jobTwo = Job(jobTwoConf, "Job-2");
    FileInputFormat.addInputPath(jobTwo, jobOneOutput); // here we set the job-1's output as job-2's input
    FileOutputFormat.setOutputPath(jobTwo, jobTwoOutput); // final output
    ControlledJob jobTwoControl = new ControlledJob(jobTwoConf);
    jobTwoControl.setJob(jobTwo);

    JobControl jobControl = new JobControl("job-control");
    jobControl.add(jobOneControl);
    jobControl.add(jobTwoControl);
    jobTwoControl.addDependingJob(jobOneControl); // this condition makes the job-2 wait until job-1 is done

    Thread jobControlThread = new Thread(jobControl);
    jobControlThread.start();
    jobControlThread.join(); 

    /* The jobControl.allFinished() can also be used to wait until all jobs are done */
dy2hfwbg

dy2hfwbg2#

为此,需要实现两个单独的mapreduce作业。第一个作业的结果需要写入一些持久性存储(如hdfs)并由第二个作业读取。sequenceoutputformat/inputformat常用于此。两个mapreduce作业都可以从同一个驱动程序执行。

相关问题