如何有效地读写Parquet文件?

xzlaal3s  于 2021-05-29  发布在  Hadoop
关注(0)|答案(4)|浏览(390)

我正在开发一个实用程序,它一次读取多个Parquet文件,并将它们写入一个输出文件。实现非常简单。这个实用程序从目录中读取Parquet文件,读取 Group 然后使用parquetwrite将所有这些组写入一个文件。
在读取600mb之后,它抛出java堆空间的内存不足错误。读写500mb的数据也需要15-20分钟。
有没有办法让这个行动更有效率?
read方法如下所示:

ParquetFileReader reader = new ParquetFileReader(conf, path, ParquetMetadataConverter.NO_FILTER);
          ParquetMetadata readFooter = reader.getFooter();
          MessageType schema = readFooter.getFileMetaData().getSchema();
          ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
          reader.close();
          PageReadStore pages = null;
          try {
            while (null != (pages = r.readNextRowGroup())) {
              long rows = pages.getRowCount();
              System.out.println("Number of rows: " + pages.getRowCount());

              MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
              RecordReader<Group> recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
              for (int i = 0; i < rows; i++) {
                Group g = (Group) recordReader.read();
                //printGroup(g);
                groups.add(g);
              }
            }
          } finally {
            System.out.println("close the reader");

            r.close();
          }

写入方法如下:

for(Path file : files){
            groups.addAll(readData(file));
        }

        System.out.println("Number of groups from the parquet files "+groups.size());

        Configuration configuration = new Configuration();
        Map<String, String> meta = new HashMap<String, String>();
        meta.put("startkey", "1");
        meta.put("endkey", "2");
        GroupWriteSupport.setSchema(schema, configuration);
        ParquetWriter<Group> writer = new ParquetWriter<Group>(
                new Path(outputFile),
                new GroupWriteSupport(),
                CompressionCodecName.SNAPPY,
                2147483647,
                268435456,
                134217728,
                true,
                false,
                ParquetProperties.WriterVersion.PARQUET_2_0,
                configuration);
        System.out.println("Number of groups to write:"+groups.size());
        for(Group g : groups) {
            writer.write(g);
        }
        writer.close();
tvokkenx

tvokkenx1#

我也面临着同样的问题。对于不太大的文件(高达100兆字节),写入时间可能长达20分钟。尝试使用kite-sdkapi。我知道它看起来像是被遗弃了,但其中有些事情做得非常有效率。如果您喜欢spring,也可以尝试springdatahadoop(这是kitesdkapi上的某种 Package 器)。在我的例子中,这个库的使用将写作时间减少到了2分钟。
例如,您可以用以下方式在parquet中编写(使用spring data hadoop,但使用kite sdkapi编写看起来非常相似):

final DatasetRepositoryFactory repositoryFactory = new DatasetRepositoryFactory();
repositoryFactory.setBasePath(basePath);
repositoryFactory.setConf(configuration);
repositoryFactory.setNamespace("my-parquet-file");

DatasetDefinition datasetDefinition = new DatasetDefinition(targetClass, true, Formats.PARQUET.getName());
try (DataStoreWriter<T> writer = new ParquetDatasetStoreWriter<>(clazz, datasetRepositoryFactory, datasetDefinition)) {
     for (T record : records) {
        writer.write(record);
     }
     writer.flush();
}

当然,您需要向您的项目添加一些依赖项(在我的示例中,这是spring data hadoop):

<dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-hadoop</artifactId>
        <version>${spring.hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-hadoop-boot</artifactId>
        <version>${spring.hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-hadoop-config</artifactId>
        <version>${spring.hadoop.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-hadoop-store</artifactId>
        <version>${spring.hadoop.version}</version>
    </dependency>

如果您绝对希望只使用本机hadoopapi来实现这一点,那么在任何情况下,查看这些库的源代码都会非常有用,以便高效地实现在parquet文件中的编写。

vwkv1x7d

vwkv1x7d2#

您正在尝试实现的目标已经可以使用 merge 指挥 parquet-tools . 但是,不建议合并小文件,因为它实际上并不合并行组,而是将它们一个接一个地放置(这正是您在问题中描述的方式)。生成的文件可能具有不好的性能特征。
但是,如果您想自己实现它,您可以增加堆大小,或者修改代码,使其在写入新文件之前不会将所有文件读入内存,而是逐个读取(或者更好的是,逐行组读取),并立即将它们写入新文件。这样,您只需要在内存中保留一个文件或行组。

ltskdhd1

ltskdhd13#

我使用这些函数来合并Parquet文件,但它在scala中。不管怎样,这可能会给你一个很好的起点。

import java.util

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetFileWriter}
import org.apache.parquet.hadoop.util.{HadoopInputFile, HadoopOutputFile}
import org.apache.parquet.schema.MessageType

import scala.collection.JavaConverters._

object ParquetFileMerger {
    def mergeFiles(inputFiles: Seq[Path], outputFile: Path): Unit = {
        val conf = new Configuration()
        val mergedMeta = ParquetFileWriter.mergeMetadataFiles(inputFiles.asJava, conf).getFileMetaData
        val writer = new ParquetFileWriter(conf, mergedMeta.getSchema, outputFile, ParquetFileWriter.Mode.OVERWRITE)

        writer.start()
        inputFiles.foreach(input => writer.appendFile(HadoopInputFile.fromPath(input, conf)))
        writer.end(mergedMeta.getKeyValueMetaData)
    }

    def mergeBlocks(inputFiles: Seq[Path], outputFile: Path): Unit = {
        val conf = new Configuration()
        val parquetFileReaders = inputFiles.map(getParquetFileReader)
        val mergedSchema: MessageType =
            parquetFileReaders.
              map(_.getFooter.getFileMetaData.getSchema).
              reduce((a, b) => a.union(b))

        val writer = new ParquetFileWriter(HadoopOutputFile.fromPath(outputFile, conf), mergedSchema, ParquetFileWriter.Mode.OVERWRITE, 64*1024*1024, 8388608)

        writer.start()
        parquetFileReaders.foreach(_.appendTo(writer))
        writer.end(new util.HashMap[String, String]())
    }

    def getParquetFileReader(file: Path): ParquetFileReader = {
        ParquetFileReader.open(HadoopInputFile.fromPath(file, new Configuration()))
    }
}
x7rlezfr

x7rlezfr4#

我已经用spark和pyspark脚本实现了一些解决方案,下面是相同的示例代码,这里从目录位置加载多个parquet文件,如果parquet文件模式在文件中有点不同,我们也会合并这些文件。

from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .appName("App_name") \
        .getOrCreate() 

dataset_DF = spark.read.option("mergeSchema", "true").load("/dir/parquet_files/")

dataset_DF.write.parquet("file_name.parquet")

希望这是一个简短的解决方案。

相关问题