我正在开发一个实用程序,它一次读取多个Parquet文件,并将它们写入一个输出文件。实现非常简单。这个实用程序从目录中读取Parquet文件,读取 Group
然后使用parquetwrite将所有这些组写入一个文件。
在读取600mb之后,它抛出java堆空间的内存不足错误。读写500mb的数据也需要15-20分钟。
有没有办法让这个行动更有效率?
read方法如下所示:
ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
ParquetMetadata readFooter = reader.getFooter();
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
reader.close();
PageReadStore pages = null;
try {
while (null != (pages = r.readNextRowGroup())) {
long rows = pages.getRowCount();
System.out.println("Number of rows: " + pages.getRowCount());
MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
Group g = (Group) recordReader.read();
//printGroup(g);
groups.add(g);
}
}
} finally {
System.out.println("close the reader");
r.close();
}
写入方法如下:
for(Path file : files){
groups.addAll(readData(file));
}
System.out.println("Number of groups from the parquet files "+groups.size());
Configuration configuration = new Configuration();
Map<String, String> meta = new HashMap<String, String>();
meta.put("startkey", "1");
meta.put("endkey", "2");
GroupWriteSupport.setSchema(schema, configuration);
ParquetWriter<Group> writer = new ParquetWriter<Group>(
new Path(outputFile),
new GroupWriteSupport(),
CompressionCodecName.SNAPPY,
2147483647,
268435456,
134217728,
true,
false,
ParquetProperties.WriterVersion.PARQUET_2_0,
configuration);
System.out.println("Number of groups to write:"+groups.size());
for(Group g : groups) {
writer.write(g);
}
writer.close();
4条答案
按热度按时间tvokkenx1#
我也面临着同样的问题。对于不太大的文件(高达100兆字节),写入时间可能长达20分钟。尝试使用kite-sdkapi。我知道它看起来像是被遗弃了,但其中有些事情做得非常有效率。如果您喜欢spring,也可以尝试springdatahadoop(这是kitesdkapi上的某种 Package 器)。在我的例子中,这个库的使用将写作时间减少到了2分钟。
例如,您可以用以下方式在parquet中编写(使用spring data hadoop,但使用kite sdkapi编写看起来非常相似):
当然,您需要向您的项目添加一些依赖项(在我的示例中,这是spring data hadoop):
如果您绝对希望只使用本机hadoopapi来实现这一点,那么在任何情况下,查看这些库的源代码都会非常有用,以便高效地实现在parquet文件中的编写。
vwkv1x7d2#
您正在尝试实现的目标已经可以使用
merge
指挥parquet-tools
. 但是,不建议合并小文件,因为它实际上并不合并行组,而是将它们一个接一个地放置(这正是您在问题中描述的方式)。生成的文件可能具有不好的性能特征。但是,如果您想自己实现它,您可以增加堆大小,或者修改代码,使其在写入新文件之前不会将所有文件读入内存,而是逐个读取(或者更好的是,逐行组读取),并立即将它们写入新文件。这样,您只需要在内存中保留一个文件或行组。
ltskdhd13#
我使用这些函数来合并Parquet文件,但它在scala中。不管怎样,这可能会给你一个很好的起点。
x7rlezfr4#
我已经用spark和pyspark脚本实现了一些解决方案,下面是相同的示例代码,这里从目录位置加载多个parquet文件,如果parquet文件模式在文件中有点不同,我们也会合并这些文件。
希望这是一个简短的解决方案。