从驱动程序向mapreduce传递对象

s3fp2yjn  于 2021-05-30  发布在  Hadoop
关注(0)|答案(1)|浏览(353)

我创建了一个驱动程序,它读取一个配置文件,构建一个对象列表(基于配置)并将该列表传递给mapreduce(mapreduce有一个静态属性,该属性保存对该对象列表的引用)。
它只在本地起作用。一旦我在集群配置上运行这个作业,我就会得到各种各样的错误,表明列表还没有建立。这让我觉得我做错了,在集群上,mapreduce是独立于驱动程序运行的。
我的问题是如何正确初始化Map器。
(我使用的是Hadoop2.4.1)

iyzzxitl

iyzzxitl1#

这与边数据分布问题有关。
有两种方法用于侧数据分发。
1) 分布式缓存
2) 配置
当您拥有要共享的对象时,我们可以使用configuration类。
这个讨论将依赖于配置类来提供集群中的对象,所有Map器和(或)还原器都可以访问该对象。这里的方法很简单。配置类的setstring(string,string)setter被用来完成这个任务。必须共享的对象在驱动端被序列化为java字符串,并在Map器或缩减器处反序列化回该对象。
在下面的示例代码中,我使用com.google.gson.gson类实现了简单的序列化和反序列化。您也可以使用java序列化。
类,该类表示需要共享的对象

public class TestBean {
    String string1;
    String string2;
    public TestBean(String test1, String test2) {
        super();
        this.string1 = test1;
        this.string2 = test2;
    }
    public TestBean() {
        this("", "");
    }
    public String getString1() {
        return string1;
    }
    public void setString1(String test1) {
        this.string1 = test1;
    }
    public String getString2() {
        return string2;
    }
    public void setString2(String test2) {
        this.string2 = test2;
    }
}

可以在其中设置配置的主类

public class GSONTestDriver {
    public static void main(String[] args) throws Exception {
        System.out.println("In Main");
        Configuration conf = new Configuration();
        TestBean testB1 = new TestBean("Hello1","Gson1");
        TestBean testB2 = new TestBean("Hello2","Gson2");
        Gson gson = new Gson();
        String testSerialization1 = gson.toJson(testB1);
        String testSerialization2 = gson.toJson(testB2);
        conf.set("instance1", testSerialization1);
        conf.set("instance2", testSerialization2);
        Job job = new Job(conf, " GSON Test");
        job.setJarByClass(GSONTestDriver.class);
        job.setMapperClass(GSONTestMapper.class);
        job.setNumReduceTasks(0);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
    }
}

可以从中检索对象的Map器类

public class GSONTestMapper extends
Mapper<LongWritable, Text, Text, NullWritable> {
    Configuration conf;
    String inst1;
    String inst2;
    public void setup(Context context) {
        conf = context.getConfiguration();
        inst1 = conf.get("instance1");
        inst2 = conf.get("instance2");
        Gson gson = new Gson();
        TestBean tb1 = gson.fromJson(inst1, TestBean.class);
        System.out.println(tb1.getString1());
        System.out.println(tb1.getString2());
        TestBean tb2 = gson.fromJson(inst2, TestBean.class);
        System.out.println(tb2.getString1());
        System.out.println(tb2.getString2());
    } 
    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        context.write(value,NullWritable.get());
    }
}

使用com.google.gson.gson类的tojson(objectsrc)方法将bean转换为序列化的json字符串。然后,序列化的json字符串作为值通过配置示例传递,并通过Map器的名称进行访问。使用同一gson类的fromjson(stringjson,classoft)方法反序列化字符串。代替我的测试bean,你可以放置你的对象。

相关问题