如何使用avroparquetoutputformat设置多个avro模式?

vjrehmav  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(318)

在我的mapreduce工作中,我使用avroparquetoutputformat使用avro模式写入Parquet文件。
应用程序逻辑需要由reducer创建多种类型的文件,每个文件都有自己的avro模式。
类avroparquetoutputformat有一个静态方法setschema(),用于设置输出的avro模式。查看代码,avroparquetoutputformat使用avrowritesupport.setschema(),这也是一个静态实现。
在不扩展avrowritesupport和破解逻辑的情况下,有没有更简单的方法在单个mr作业中实现avroparquetoutputformat的多个avro模式输出?
任何指针/输入都非常感谢。
谢谢和问候
mk公司

vc9ivgsu

vc9ivgsu1#

现在回答这个问题可能已经很晚了,但我也遇到了这个问题,并提出了解决办法。
首先,没有像这样的支持 MultipleAvroParquetOutputFormat '内置 parquet-mr . 但为了达到类似的效果 MultipleOutputs .
对于只使用Map的工作,请按以下方式放置Map绘制器:

public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{

    protected  KafkaAvroDecoder deserializer;
    protected String outputPath = "";

    // Using MultipleOutputs to write custom named files
    protected MultipleOutputs<Void, GenericRecord> mos;

    public void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration conf = context.getConfiguration();           
        outputPath = conf.get(FileOutputFormat.OUTDIR);
        mos = new MultipleOutputs<Void, GenericRecord>(context);
    }

    public void map(LongWritable ln, BytesWritable value, Context context){

        try {
            GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
            AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
            Schema schema = record.getSchema();
            String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm 
            mos.write( (Void) null, record, mergeEventsPath);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

}

这将创建一个新的 RecordWriter 为每个模式创建一个新的parquet文件,并附加模式名称,例如schema1-r-0000.parquet。
这还将根据驱动程序中设置的模式创建默认的part-r-0000x.parquet文件。要避免这种情况,请使用 LazyOutputFormat 比如:

LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);

希望这有帮助。

相关问题