我有两种不同类型的avro数据,它们有一些公共字段。我想在Map器中读取那些公共字段。我想通过在集群中生成单个作业来阅读本文。
下面是avro模式示例
模式1:
{“type”:“record”,“name”:“test”,“namespace”:“com..schema.schemaone”,“doc”:“avro storing with schema using mr.”,“fields”:[{“name”:“ee”,“type”:“string”,“default”:null},{“name”:“aa”,“type”:[“null”,“long”],“default”:null},{“name”:“bb”,“type”:[“null”,“string”],“default”:null},{“name”:“cc”,“type”:[“null”,“string”],“default:null}}}
方案2:
{“type”:“record”,“name”:“test”,“namespace”:“com..schema.schematwo”,“doc”:“avro storing with schema using mr.”,“fields”:[{“name”:“ee”,“type”:“string”,“default”:null},{“name”:“aa”,“type”:[“null”,“long”],“default”:null},{“name”:“cc”,“type”:[“null”,“string”],“default”:null},{“name”:“dd”,“type”:[“null”,“string”],“default:null}}}
驾驶员等级:
package com.mango.schema.aggrDaily;
import java.util.Date;
import org.apache.avro.Schema;
import org.apache.avro.mapred.AvroJob;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class AvroDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(super.getConf(), getClass());
conf.setJobName("DF");
args[0] = "hdfs://localhost:9999/home/hadoop/work/alok/aggrDaily/data/avro512MB/part-m-00000.avro";
args[1] = "/home/hadoop/work/alok/tmp"; // temp location
args[2] = "hdfs://localhost:9999/home/hadoop/work/alok/tmp/10";
FileInputFormat.addInputPaths(conf, args[0]);
FileOutputFormat.setOutputPath(conf, new Path(args[2]));
AvroJob.setInputReflect(conf);
AvroJob.setMapperClass(conf, AvroMapper.class);
AvroJob.setOutputSchema(
conf,
Pair.getPairSchema(Schema.create(Schema.Type.STRING),
Schema.create(Schema.Type.INT)));
RunningJob job = JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
long startTime = new Date().getTime();
System.out.println("Start Time :::::" + startTime);
Configuration conf = new Configuration();
int exitCode = ToolRunner.run(conf, new AvroDriver(), args);
long endTime = new Date().getTime();
System.out.println("End Time :::::" + endTime);
System.out.println("Total Time Taken:::"
+ new Double((endTime - startTime) * 0.001) + "Sec.");
System.exit(exitCode);
}
}
Map器类:
package com.mango.schema.aggrDaily;
import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.mapred.AvroCollector;
import org.apache.avro.mapred.AvroMapper;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.mapred.Reporter;
public class AvroMapper extends
AvroMapper<GenericData, Pair<CharSequence, Integer>> {
@Override
public void map(GenericData record,
AvroCollector<Pair<CharSequence, Integer>> collector, Reporter reporter) throws IOException {
System.out.println("record :: " + record);
}
}
通过设置输入模式,我可以用这段代码读取avro数据。 AvroJob.setInputSchema(conf, new AggrDaily().getSchema());
由于avro数据在数据中内置了模式,所以我不想显式地将特定的模式传递给作业。我在Pig身上实现了这一点。但是现在我想在mapreduce中也实现同样的功能。
有谁能帮我通过mr代码实现这一点,或者让我知道我哪里做错了?
2条答案
按热度按时间t40tm48m1#
通过*org.apache.hadoop.mapreduce.lib.input.multipleinputs类,我们可以通过单个mr作业读取多个avro数据
6yoyoihd2#
我们不能使用org.apache.hadoop.mapreduce.lib.input.multipleinputs来读取多个avro数据,因为每个avro输入都有一个与之关联的模式,并且当前上下文只能存储一个输入的模式。所以其他Map绘制者将无法读取数据。。
hcatinputformat也是如此(因为每个输入都有一个与之关联的模式)。但是,在hcatalog 0.14以后的版本中也有相同的规定。
avromultipleinputs可以用来实现同样的功能。它只适用于特定Map和反射Map。它从版本1.7.7开始提供。