将avro文件转换为文本文件的java程序

xmq68pz9  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(344)

需要java程序的帮助,该程序将avro.avsc模式文件和avrofile作为输入,并将它们转换为java中的文本文件。

mlmc2os5

mlmc2os51#

这段java代码对我很有用,希望对其他人有帮助。导入java.io。;导入java.util。;

import org.apache.avro.*;
import org.apache.avro.generic.*;
import org.apache.avro.file.*;
import org.apache.avro.io.*;

public class AvrotoTextFormatter

{

public static void main ( String args[]) throws Exception

{

    InputStream in = null;
    in = new FileInputStream(args[0]);
    BufferedReader br;
    BufferedInputStream inStream = new BufferedInputStream(in);
    PrintWriter pr1 = new PrintWriter(args[1], "UTF-8");
    PrintWriter pr = new PrintWriter(args[2], "UTF-8");
    StringTokenizer st;
    StringTokenizer st1;
    int row_counter = 0;
    String header_fields = "";
    String content_records = "";
    String sCurrentLine = "";

    GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
    DataFileStream<Object> fileReader = new DataFileStream<Object>(inStream, reader);

    pr1.println(fileReader.getSchema().getFields());

    pr1.close();

    br = new BufferedReader(new java.io.FileReader(args[1]));

    while ((sCurrentLine = br.readLine()) != null)
    {
    st = new StringTokenizer(sCurrentLine," ");
    while (st.hasMoreTokens())
    {
        header_fields = header_fields + st.nextToken() + "|";
        st.nextToken();
        st.nextToken();             
    }        
    }
    header_fields = header_fields.substring(1,header_fields.length()-1); 
    pr.println(header_fields);

    File file = new File(args[0]);
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(fileReader.getSchema());
    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
    GenericRecord user = null;
        while (dataFileReader.hasNext())
           {
              content_records = "";
              user = dataFileReader.next(user);
              st1 = new StringTokenizer(header_fields,"|");
              while (st1.hasMoreTokens())
            {
                content_records = content_records + user.get(st1.nextToken()) + "|";              
                }
    content_records = content_records.substring(0,content_records.length()-1);
    pr.println(content_records); 
            } 
    fileReader.close();
    br.close();
    pr.close();  

}

}
mfpqipee

mfpqipee2#

用spark找到代码片段做同样的事情更快更容易。分享,以便对他人有所帮助

import org.apache.avro.mapreduce.AvroKeyInputFormat
import org.apache.avro.mapred.AvroKey
import org.apache.hadoop.io.NullWritable
val avroRdd = sc.newAPIHadoopFile("/sit/data/presentation/bbsbi/alayer/test/000000_0", classOf[AvroKeyInputFormat[String]], classOf[AvroKey[String]], classOf[NullWritable]).keys.map(_.toString)

val n=avroRdd.map(_.split(",").map(_.split(":")(1).trim).map(l=>l.substring(l.indexOf("\"")+1,l.lastIndexOf("\""))).mkString("|"))
n.collect.foreach(println)
cgh8pdjw

cgh8pdjw3#

我怎样才能在程序下面运行?我的意思是我能提供的3个命令行输入是什么?

import org.apache.avro.*;
import org.apache.avro.generic.*;
import org.apache.avro.file.*;
import org.apache.avro.io.*;

public class AvrotoTextFormatter

{

public static void main ( String args[]) throws Exception

{

    InputStream in = null;
    in = new FileInputStream(args[0]);
    BufferedReader br;
    BufferedInputStream inStream = new BufferedInputStream(in);
    PrintWriter pr1 = new PrintWriter(args[1], "UTF-8");
    PrintWriter pr = new PrintWriter(args[2], "UTF-8");
    StringTokenizer st;
    StringTokenizer st1;
    int row_counter = 0;
    String header_fields = "";
    String content_records = "";
    String sCurrentLine = "";

    GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
    DataFileStream<Object> fileReader = new DataFileStream<Object>(inStream, reader);

    pr1.println(fileReader.getSchema().getFields());

    pr1.close();

    br = new BufferedReader(new java.io.FileReader(args[1]));

    while ((sCurrentLine = br.readLine()) != null)
    {
    st = new StringTokenizer(sCurrentLine," ");
    while (st.hasMoreTokens())
    {
        header_fields = header_fields + st.nextToken() + "|";
        st.nextToken();
        st.nextToken();             
    }        
    }
    header_fields = header_fields.substring(1,header_fields.length()-1); 
    pr.println(header_fields);

    File file = new File(args[0]);
    DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(fileReader.getSchema());
    DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
    GenericRecord user = null;
        while (dataFileReader.hasNext())
           {
              content_records = "";
              user = dataFileReader.next(user);
              st1 = new StringTokenizer(header_fields,"|");
              while (st1.hasMoreTokens())
            {
                content_records = content_records + user.get(st1.nextToken()) + "|";              
                }
    content_records = content_records.substring(0,content_records.length()-1);
    pr.println(content_records); 
            } 
    fileReader.close();
    br.close();
    pr.close();  

}

}
yshpjwxd

yshpjwxd4#

下面的代码为我工作

private static JSONArray readJsonFromAvro(String absFilePath)throws IOException,
InterruptedException
{
    JSONArray jsonarray = new JSONArray();
    InputStream in = new FileInputStream(absFilePath);
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BufferedInputStream inStream = new BufferedInputStream(in);
    GenericDatumReader<Object> reader = new GenericDatumReader<Object>();
    DataFileStream<Object> fileReader = new DataFileStream<Object>(
            inStream, reader);
    try {
        final Schema schema = fileReader.getSchema();
        final JsonEncoder encoder = EncoderFactory.get().jsonEncoder(
                schema, baos);
        for (final Object datum : fileReader) {
            //writer.write(datum, encoder);
             JSONObject jsonObj = new JSONObject(datum.toString());
             jsonarray.put(jsonObj);
        }
        encoder.flush();
        System.out.println();
    } finally {
        fileReader.close();
    }
    return jsonarray;
}

相关问题