我正在将一个hadoop作业的avro输出输入到另一个hadoop作业中。第一个作业只是使用以下设置运行一个Map器。如果有任何用处,我的avsc文件会定义如下的复合对象:
[
{
"type": "record",
"name": "MySubRecord",
"namespace": "blah",
"fields": [
{"name": "foobar", "type": ["null","string"], "default":null},
{"name": "bar","type": ["null","string"], "default":null},
{"name": "foo","type": ["null","string"], "default":null},
]
},{
"type": "record",
"name": "MyRecord",
"namespace" : "blah",
"fields" : [
{"name": "ID", "type":["null", "string"], "default":null},
{"name": "secondID", "type":["null", "string"], "default":null},
{"name": "subRecordA", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordB", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordC", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordD", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordE", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordF", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordG", "type":["null","blah.MySubRecord"], "default":null},
{"name": "subRecordH", "type":["null","blah.MySubRecord"], "default":null}
]
}
]
我的Map器类签名如下所示:
public static class MyMapper extends Mapper<LongWritable, Text, AvroKey<MyRecord>, NullWritable>
使用如下设置方法:
protected void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
keyOut = new AvroKey<>();}
Map程序代码如下所示
protected void map(LongWritable keyIn, Text valueIn, Context context) throws IOException, InterruptedException {
MyRecord record;
record = getMyRecordFunction();
keyOut.datum(record);
context.write(keyOut, NullWritable.get());
}
我在第一份工作中的逻辑看起来不错,因为当我使用命令行avrotools jar将输出打印为json时,它看起来和我期望的一样。
我的问题发生在我运行第二个作业时。我的第二个作业的Map器具有以下设置:
public static class MySecondJobMapper extends Mapper<AvroKey<MyRecord>, NullWritable, IntWritable, DoubleWritable>
我的问题发生在我第二份工作中map方法的最开始。我的Map方法如下所示:
protected void map(AvroKey<MyRecord> key, NullWritable value, Context context) throws IOException, InterruptedException {
MyRecord myRecord = key.datum();
##### some other logic
每次运行第二个作业时,都会出现以下错误:
16/07/28 18:24:38 WARN mapred.LocalJobRunner: job_local1682958846_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to MyRecord
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to MyRecord
at your.class.path$StatsCalculatorMapper.map(YourSecondJob.java:150)
at your.class.path$StatsCalculatorMapper.map(YourSecondJob.java:110)
at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
1条答案
按热度按时间nfg76nw01#
看起来问题源于这样一个事实:我在本地、伪分布式环境中进行测试,而pom.xml中指定的正确avro版本没有被拉入。取而代之的是,一个带有这个bug的旧版本的avro在我还没意识到的情况下就被拉了进来。一旦我在emr上运行了相同的程序,它就运行得很好,因为使用的是avro的正确版本。