java—如何在单个mapreduce中读取多种类型的avro数据

wooyq4lh  于 2021-06-03  发布在  Hadoop
关注(0)|答案(2)|浏览(396)

我有两种不同类型的avro数据,它们有一些公共字段。我想在Map器中读取那些公共字段。我想通过在集群中生成单个作业来阅读本文。
下面是avro模式示例
模式1:
{“type”:“record”,“name”:“test”,“namespace”:“com..schema.schemaone”,“doc”:“avro storing with schema using mr.”,“fields”:[{“name”:“ee”,“type”:“string”,“default”:null},{“name”:“aa”,“type”:[“null”,“long”],“default”:null},{“name”:“bb”,“type”:[“null”,“string”],“default”:null},{“name”:“cc”,“type”:[“null”,“string”],“default:null}}}
方案2:
{“type”:“record”,“name”:“test”,“namespace”:“com..schema.schematwo”,“doc”:“avro storing with schema using mr.”,“fields”:[{“name”:“ee”,“type”:“string”,“default”:null},{“name”:“aa”,“type”:[“null”,“long”],“default”:null},{“name”:“cc”,“type”:[“null”,“string”],“default”:null},{“name”:“dd”,“type”:[“null”,“string”],“default:null}}}
驾驶员等级:

package com.mango.schema.aggrDaily;

import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class AvroDriver extends Configured implements Tool {

    @Override
    public int run(String[] args) throws Exception {

        JobConf conf = new JobConf(super.getConf(), getClass());
        conf.setJobName("DF");

        args[0] = "hdfs://localhost:9999/home/hadoop/work/alok/aggrDaily/data/avro512MB/part-m-00000.avro";
        args[1] = "/home/hadoop/work/alok/tmp"; // temp location
        args[2] = "hdfs://localhost:9999/home/hadoop/work/alok/tmp/10";

        FileInputFormat.addInputPaths(conf, args[0]);
        FileOutputFormat.setOutputPath(conf, new Path(args[2]));

        AvroJob.setInputReflect(conf);
        AvroJob.setMapperClass(conf, AvroMapper.class);

        AvroJob.setOutputSchema(
                conf,
                Pair.getPairSchema(Schema.create(Schema.Type.STRING),
                        Schema.create(Schema.Type.INT)));

        RunningJob job = JobClient.runJob(conf);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        long startTime = new Date().getTime();
        System.out.println("Start Time :::::" + startTime);
        Configuration conf = new Configuration();
        int exitCode = ToolRunner.run(conf, new AvroDriver(), args);
        long endTime = new Date().getTime();
        System.out.println("End Time :::::" + endTime);
        System.out.println("Total Time Taken:::"
                + new Double((endTime - startTime) * 0.001) + "Sec.");
        System.exit(exitCode);
    }
}

Map器类:

package com.mango.schema.aggrDaily;

import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.mapred.Reporter;

public class AvroMapper extends
        AvroMapper<GenericData, Pair<CharSequence, Integer>> {

    @Override
    public void map(GenericData record,
        AvroCollector<Pair<CharSequence, Integer>> collector, Reporter reporter) throws IOException {
        System.out.println("record :: " + record);
    }

}

通过设置输入模式,我可以用这段代码读取avro数据。 AvroJob.setInputSchema(conf, new AggrDaily().getSchema()); 由于avro数据在数据中内置了模式,所以我不想显式地将特定的模式传递给作业。我在Pig身上实现了这一点。但是现在我想在mapreduce中也实现同样的功能。
有谁能帮我通过mr代码实现这一点,或者让我知道我哪里做错了?

t40tm48m

t40tm48m1#

通过*org.apache.hadoop.mapreduce.lib.input.multipleinputs类,我们可以通过单个mr作业读取多个avro数据

6yoyoihd

6yoyoihd2#

我们不能使用org.apache.hadoop.mapreduce.lib.input.multipleinputs来读取多个avro数据,因为每个avro输入都有一个与之关联的模式,并且当前上下文只能存储一个输入的模式。所以其他Map绘制者将无法读取数据。。
hcatinputformat也是如此(因为每个输入都有一个与之关联的模式)。但是,在hcatalog 0.14以后的版本中也有相同的规定。
avromultipleinputs可以用来实现同样的功能。它只适用于特定Map和反射Map。它从版本1.7.7开始提供。

相关问题