avromultipleinputs-issue添加多个路径

8iwquhpp  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(329)

下面是我用来添加具有不同Map器类的多个avro输入路径的驱动程序代码片段

AvroMultipleInputs.addInputPath(jobConf, new Path(args[0]), IncrementalDataMapper.class, incrSchema);
AvroMultipleInputs.addInputPath(jobConf, new Path(args[1]), BaseDataMapper.class, incrSchema);

AvroJob.setMapOutputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema));

AvroJob.setReducerClass(jobConf, DeltaCaptureReducer.class);
AvroJob.setInputSchema(jobConf, Pair.getPairSchema(Schema.create(Type.STRING), incrSchema));
AvroJob.setOutputSchema(jobConf, incrSchema);

当我运行这个驱动程序时,我得到以下来自avromultipleinputs的getinputschemamap(…)方法的异常
java.lang.runtimeexception:org.apache.avro.schemaparseexception:无法重新定义:com.sample.test
现在我所做的是在一个独立的程序中模拟avromultipleinputs的getinputschemamap(…)方法来产生相同的问题。
独立代码
失败的代码,

Schema.Parser schemaParser = new Schema.Parser();
    String m1 = "path1;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }");
    String m2 = "path2;" + toBase64("{\"type\":\"record\",\"name\":\"Test\",\"namespace\":\"com.sample\",\"fields\":[ {\"name\":\"BATCH_ID\",\"type\":[\"null\",\"long\"]} ] }");
    String[] schemaMappings = (m1 + "," + m2).split(",");
    for (String schemaMapping : schemaMappings) {
        String[] split = schemaMapping.split(";");
        String schemaString = fromBase64(split[1]);
        System.out.println(schemaString);
        Schema inputSchema;
        try {
            inputSchema = schemaParser.parse(schemaString);
        } catch (SchemaParseException e) {
            throw new RuntimeException(e);
        }
    }

现在我通过为每个Map创建解析器来解决这个问题,如下所示。

for (String schemaMapping : schemaMappings) {
        String[] split = schemaMapping.split(";");
        String schemaString = fromBase64(split[1]);
        System.out.println(schemaString);
        Schema inputSchema;
        try {
            Schema.Parser schemaParser = new Schema.Parser();
            inputSchema = schemaParser.parse(schemaString);
        } catch (SchemaParseException e) {
                throw new RuntimeException(e);
        }
}

有人试过这个吗?有什么好主意吗?
我还尝试将avromultipleinputs复制到我的项目中,并将代码更改为使用上述不同的解析器,但出现以下异常
org.apache.hadoop.mapred.lib.multipleinputs.getinputformatmap(multipleinputs)上的线程“main”java.lang.nullpointerexception中出现异常。java:93)位于org.apache.hadoop.mapred.lib.delegatinginputformat.getsplits(delegatinginputformat)。java:55)在org.apache.hadoop.mapreduce.jobsubmitter.writeoldsplits(jobsubmitter。java:328)在org.apache.hadoop.mapreduce.jobsubmitter.writesplits(jobsubmitter。java:320)在org.apache.hadoop.mapreduce.jobsubmitter.submitjobinternal(jobsubmitter。java:196)在org.apache.hadoop.mapreduce.job$10.run(job。java:1290)

zkure5ic

zkure5ic1#

事实上,我必须自定义更多的文件,使其工作。我仍然不确定是否还有其他影响(我不知道我不知道的是什么)
avromultipleinputs.java文件
delegatinginputformat.java文件
delegatingmapper.java文件
Map收集器.java
taggedinputsplit.java文件

相关问题