我所有的程序都是用hadoop的新mr1接口(org.apache.hadoop.mapreduce)编写的,所以我也想使用avro的新org.apache.avro.mapreduce。但这对我没用。
程序接收avro数据的输入并输出相同的数据。我的程序背后的主要思想是根据avro Package 的键/值对hadoop的mapper和reducer进行子类化。以下是我的工作驱动程序:
AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());
job.setMapperClass(MyAvroMap.class);
job.setReducerClass(MyAvroReduce.class);
job.setInputFormatClass(AvroKeyInputFormat.class);
job.setOutputFormatClass(AvroKeyOutputFormat.class);
job.setMapOutputKeyClass(AvroKey.class);
job.setMapOutputValueClass(AvroValue.class);
job.setOutputKeyClass(AvroKey.class);
job.setOutputValueClass(NullWritable.class);
分别给出了myavromap和myavroreduce子类的定义
public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }
public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>,
AvroKey<NetflowRecord>, NullWritable>{ ... }
netflowrecord是我的avro记录类。我遇到了一个例外
java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey
通过阅读hadoop和avro的源代码,我发现jobconf抛出了异常,以确保map键是writeablecomparable的子类,如下所示(hadoop1.2.1,line759)
WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));
但是avro表明avrokey和avrovalue只是一个简单的 Package 器,没有对hadoop的可写*接口进行子类化。
我相信,即使没有测试,我也可以通过使用旧的mapred接口,但这不是我想要的。你能给我举几个例子或者解释一下用纯org.apache.avro.mapreduce接口编程吗??
真诚地,
贾明
1条答案
按热度按时间7z5jn7bk1#
在这个补丁的帮助下https://issues.apache.org/jira/browse/avro-593,我发现每个avrokey和avrovalue Package 器在作业配置中必须有一个模式定义。这就是我错过的。
我有两个选择:
如果保持myavromap和myavroreduce不变,我就必须为charsequence定义一个模式,并用avrojob声明这个模式,以便Map器输出,例如
avrojob.setmapoutputkeyschema(作业,<“字符序列的定义模式”>);avrojob.setmapoutputvalueschema(作业,netflowrecord.getclassschema());
通过将Map器输出键/值更改为text/avrovalue,我只需要为Map器输出值添加模式声明,如
job.setmapoutputkeyclass(text.class);avrojob.setmapoutputvalueschema(作业,netflowrecord.getclassschema());
使用mapreduceapi,我们不再需要为avromapper和avroreducer创建子类。在这里,我在代码中实现option2而不需要额外的模式定义。
贾明