逐列读取Parquet文件

pbwdgjma  于 2021-06-02  发布在  Hadoop
关注(0)|答案(0)|浏览(262)

我有一个parquet文件,其中包含rdbms表中的存档数据(因此基本上所有值的重复级别都为0)。我需要一次读一列文件。基本上我要执行的顺序是:从Parquet文件的表列中读取所有值->将其写入文本文件->从Parquet文件的下一个表列中读取所有值->将其写入另一个文本文件。。。。以此类推。
在java中最有效的方法是什么?
ps:我首先选择了parquet格式(与avro相反)进行归档,因为parquet帮助我实现了比avro更好的压缩。另外,由于我的要求是一次读取一列的所有值,我的假设是,考虑到格式的固有特性(列存储),使用parquet这样做会更快。
[编辑]下面是我用来逐列读取rdbms表的代码。为简单起见,假设表中的所有列都包含字符串(varchar)值:

public static void main(String[] args) throws IllegalArgumentException {

    Configuration conf = new Configuration();

    try {
        ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
        MessageType schema = readFooter.getFileMetaData().getSchema();
        ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
        PageReadStore rowGroup = null;

        try {

            while (null != (rowGroup = r.readNextRowGroup())) {
                numRowGroups++;
                rows+= rowGroup.getRowCount();

                ColumnReader colReader = null;
                ColumnReadStore colReadStore = new ColumnReadStoreImpl(rowGroup,new GroupRecordConverter(schema).getRootConverter(), schema, "blah");
                List<ColumnDescriptor> descriptorList = schema.getColumns();

                //for each column
                for(ColumnDescriptor colDescriptor:descriptorList) {

                    //Get datatype of the column 
                    PrimitiveTypeName type = colDescriptor.getType();

                    String[] columnNamePath = colDescriptor.getPath();
                    columnName =Arrays.toString(columnNamePath);
                    colReader = colReadStore.getColumnReader(colDescriptor);
                    long totalValuesInColumnChunk =  rowGroup.getPageReader(colDescriptor).getTotalValueCount();

                    //For every cell in the column chunk
                    for (int i = 0; i < totalValuesInColumnChunk; i++) {

                        System.out.println(columnName+" "+colReader.getBinary().toStringUsingUTF8());
                        colReader.consume();

                    }

                } 

            }
        } 
        catch(Exception ex){
            ex.printStackTrace();

        }
        finally {
            r.close();
        }
    } catch (IOException e) {
        System.out.println("Error reading parquet file.");
        e.printStackTrace();
    }

    System.out.println("Total number of rows: "+rows);

}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题