使用jobcontrol hadoop的复杂作业

sg3maiej  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(296)

有没有一种优雅的方法可以使用jobcontrol指定一系列相关作业?
其中还包括一些循环,由于有许多连续的作业(8),如果将这些作业放在一个单独的驱动程序类中,并将它们全部添加到作业控制中,就会造成相当大的混乱。
怎么做?

uxhixvfz

uxhixvfz1#

map1->reduce1->map2->reduce2->map3。。。
通过编写多个驱动程序方法(每个作业一个驱动程序方法),可以以这种方式轻松地将作业链接在一起。调用第一个驱动程序方法,该方法使用jobclient.runjob()运行作业并等待它完成。当该作业完成后,调用next driver方法,该方法将创建一个新的jobconf对象,该对象引用不同的mapper和reducer示例等。链中的第一个作业应将其输出写入一个路径,然后将该路径用作第二个作业的输入路径。这个过程可以重复,因为许多工作是必要的,以达到一个完整的解决方案的问题
作业采用jobconf对象作为其构造函数参数。通过使用adddependingjob()方法,作业可以相互依赖。代码:

x.addDependingJob(y)

表示在y成功完成之前,作业x无法启动。在作业已启动之后,无法将依赖项信息添加到该作业中。给定一组作业,这些作业可以传递给jobcontrol类的示例。jobcontrol可以通过addjob()方法接收单个作业,也可以通过addjobs()接收作业集合
为了example:- if 我们有三个作业a,b和c,顺序是a->b->c

ControlledJob AJob= new ControlledJob(JobConf for A);
ControlledJob BJob= new ControlledJob(JobConf for B);
BJob.addDependingJob(AJob);
ControlledJob CJob= new ControlledJob(JobConf for C);
CJob.addDependingJob(BJob);

JobControl jControl = newJobControl("Name");
jControl.addJob(AJob);
jControl.addJob(BJob);
jControl.addJob(CJob);

Thread runJControl = new Thread(jControl);
runJControl.start();
while (!jControl.allFinished()) {
code = jControl.getFailedJobList().size() == 0 ? 0 : 1;
Thread.sleep(1000);
}
System.exit(1);

我们可以使用单独的getter来获取每个作业的jobconf,其中包含该作业的所有信息。getter的示例代码如下below:-

public static Configuration getAJobConf(Configuration conf, Path ip, Path op)throws IOException {
        final Job AJob = new Job(conf, "name");

        AJob.setJarByClass(Driver.class);

        AJob.setInputFormatClass(InputFormat.class);
        TextInputFormat.addInputPath(AJob, ip);

        TextOutputFormat.setOutputPath(AJob, op);
        AJob.setOutputFormatClass(tOutputFormat.class);

        AJob.setMapperClass(Mapper.class);
        AJob.setReducerClass(Reducer.class);
        AJob.setNumReduceTasks(1);

        AJob.setMapOutputKeyClass(NullWritable.class);
        AJob.setMapOutputValueClass(Text.class);

        AJob.setOutputKeyClass(NullWritable.class);
        AJob.setOutputValueClass(Text.class);
        return AJob.getConfiguration();
    }

相关问题