如何使用avro org.apache.avro.mapreduce接口编程?

pjngdqdw  于 2021-06-03  发布在  Hadoop
关注(0)|答案(1)|浏览(315)

我所有的程序都是用hadoop的新mr1接口(org.apache.hadoop.mapreduce)编写的,所以我也想使用avro的新org.apache.avro.mapreduce。但这对我没用。
程序接收avro数据的输入并输出相同的数据。我的程序背后的主要思想是根据avro Package 的键/值对hadoop的mapper和reducer进行子类化。以下是我的工作驱动程序:

AvroJob.setInputKeySchema(job, NetflowRecord.getClassSchema());
    AvroJob.setOutputKeySchema(job, NetflowRecord.getClassSchema());

    job.setMapperClass(MyAvroMap.class);
    job.setReducerClass(MyAvroReduce.class);

    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroKeyOutputFormat.class);

    job.setMapOutputKeyClass(AvroKey.class);
    job.setMapOutputValueClass(AvroValue.class);

    job.setOutputKeyClass(AvroKey.class);
    job.setOutputValueClass(NullWritable.class);

分别给出了myavromap和myavroreduce子类的定义

public static class MyAvroMap extends Mapper<AvroKey<NetflowRecord>, NullWritable,
            AvroKey<CharSequence>, AvroValue<NetflowRecord>>{ ... }

public static class MyAvroReduce extends Reducer<AvroKey<CharSequence>, AvroValue<NetflowRecord>, 
                AvroKey<NetflowRecord>, NullWritable>{ ... }

netflowrecord是我的avro记录类。我遇到了一个例外

java.lang.ClassCastException: class org.apache.avro.hadoop.io.AvroKey

通过阅读hadoop和avro的源代码,我发现jobconf抛出了异常,以确保map键是writeablecomparable的子类,如下所示(hadoop1.2.1,line759)

WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class));

但是avro表明avrokey和avrovalue只是一个简单的 Package 器,没有对hadoop的可写*接口进行子类化。
我相信,即使没有测试,我也可以通过使用旧的mapred接口,但这不是我想要的。你能给我举几个例子或者解释一下用纯org.apache.avro.mapreduce接口编程吗??
真诚地,
贾明

7z5jn7bk

7z5jn7bk1#

在这个补丁的帮助下https://issues.apache.org/jira/browse/avro-593,我发现每个avrokey和avrovalue Package 器在作业配置中必须有一个模式定义。这就是我错过的。
我有两个选择:
如果保持myavromap和myavroreduce不变,我就必须为charsequence定义一个模式,并用avrojob声明这个模式,以便Map器输出,例如
avrojob.setmapoutputkeyschema(作业,<“字符序列的定义模式”>);avrojob.setmapoutputvalueschema(作业,netflowrecord.getclassschema());
通过将Map器输出键/值更改为text/avrovalue,我只需要为Map器输出值添加模式声明,如
job.setmapoutputkeyclass(text.class);avrojob.setmapoutputvalueschema(作业,netflowrecord.getclassschema());
使用mapreduceapi,我们不再需要为avromapper和avroreducer创建子类。在这里,我在代码中实现option2而不需要额外的模式定义。
贾明

相关问题