为什么我的avrokey数据在我显式地将数据写为avrokey< specifirecord>时说它是一个通用记录?

qf9go6mv  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(329)

我正在将一个hadoop作业的avro输出输入到另一个hadoop作业中。第一个作业只是使用以下设置运行一个Map器。如果有任何用处,我的avsc文件会定义如下的复合对象:

[
{
"type": "record",
"name": "MySubRecord",
"namespace": "blah",
"fields": [
    {"name": "foobar", "type": ["null","string"], "default":null},
    {"name": "bar","type": ["null","string"], "default":null},
    {"name": "foo","type": ["null","string"], "default":null},
]
},{
"type": "record",
"name": "MyRecord",
"namespace" : "blah",
"fields" : [
       {"name": "ID", "type":["null", "string"], "default":null},
       {"name": "secondID", "type":["null", "string"], "default":null},
       {"name": "subRecordA", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordB", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordC", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordD", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordE", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordF", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordG", "type":["null","blah.MySubRecord"], "default":null},
       {"name": "subRecordH", "type":["null","blah.MySubRecord"], "default":null}
]
}
]

我的Map器类签名如下所示:

public static class MyMapper extends Mapper<LongWritable, Text, AvroKey<MyRecord>, NullWritable>

使用如下设置方法:

protected void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        keyOut = new AvroKey<>();}

Map程序代码如下所示

protected void map(LongWritable keyIn, Text valueIn, Context context) throws IOException, InterruptedException {
        MyRecord record;
        record = getMyRecordFunction();
        keyOut.datum(record);
        context.write(keyOut, NullWritable.get());

    }

我在第一份工作中的逻辑看起来不错,因为当我使用命令行avrotools jar将输出打印为json时,它看起来和我期望的一样。
我的问题发生在我运行第二个作业时。我的第二个作业的Map器具有以下设置:

public static class MySecondJobMapper extends Mapper<AvroKey<MyRecord>, NullWritable, IntWritable, DoubleWritable>

我的问题发生在我第二份工作中map方法的最开始。我的Map方法如下所示:

protected void map(AvroKey<MyRecord> key, NullWritable value, Context context) throws IOException, InterruptedException {
        MyRecord myRecord = key.datum();

##### some other logic

每次运行第二个作业时,都会出现以下错误:

16/07/28 18:24:38 WARN mapred.LocalJobRunner: job_local1682958846_0001
java.lang.Exception: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to MyRecord
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to MyRecord
    at your.class.path$StatsCalculatorMapper.map(YourSecondJob.java:150)
    at your.class.path$StatsCalculatorMapper.map(YourSecondJob.java:110)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
nfg76nw0

nfg76nw01#

看起来问题源于这样一个事实:我在本地、伪分布式环境中进行测试,而pom.xml中指定的正确avro版本没有被拉入。取而代之的是,一个带有这个bug的旧版本的avro在我还没意识到的情况下就被拉了进来。一旦我在emr上运行了相同的程序,它就运行得很好,因为使用的是avro的正确版本。

相关问题