我正在从流中读取avro消息,并使用parquet.hadoop.parquetwriter将它们写入parquet文件。我正在尝试使输出文件的大小超过阈值限制。问题是parquetwriter将所有内容都保存在内存中,并且只在writer关闭时才将其写入磁盘。基于parquet文档,数据以最终格式写入内存对象,这意味着内存中对象的大小与磁盘上的最终大小相同。我的问题是如何获得内存中写入数据的大小,以便决定关闭writer?
我尝试使用我写入parquetwriter的avro消息的字节大小作为parquet writer文件大小的估计值,但由于parquet存储数据的方式(列格式)不同,这与parquet writer的大小非常不同。以下是我所做工作的伪代码:
ParquetWriter parquetWriter = new ParquetWriter(..., BLOCK_SIZE, PAGE_SIZE);
long bytesWrittenSofar = 0;
public long getLength(){
return bytesWrittenSofar;
}
public void write(org.apache.avro.generic.GenericRecord record){
parquetWriter.write(record);
bytesWrittenSofar += avroToBytes(record).length;
}
public static byte[] avroToBytes(GenericRecord record){
GenericDatumWriter<GenericRecord> writer =
new GenericDatumWriter<GenericRecord>(record.getSchema());
ByteArrayOutputStream out = new ByteArrayOutputStream();
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(record, encoder);
encoder.flush();
out.close();
return out.toByteArray();
}
结果发现,我从getlength()得到的值与Parquet文件的实际文件大小非常不同。我知道模式将被添加到文件的末尾,但这是非常小的。只是给你一个想法,当getlength()报告130mb时,实际的文件大小只有80mb。
2条答案
按热度按时间oprakyz71#
我没有找到访问parquetwriter使用的内存的方法。相反,我最终分析了上传的文件大小和其中写入的记录数。使用以前的数据并通过计算写入当前文件的记录数,我估计了当前正在进行的文件的文件大小。结果证明,这比任何其他尝试都更接近真实的文件大小。但是,这在很大程度上取决于应用程序和所写记录的变化。
fkaflof62#
它很可能是api后来添加的内容之一,但在撰写问题时还不可用。
方法
ParquetWriter#getDataSize()
应该给你你需要的。