scala将Parquet转换为csv-writer速度慢且资源密集

bgibtngc  于 2021-05-27  发布在  Hadoop
关注(0)|答案(0)|浏览(275)

我正在使用OfficeApacheParquet页面上引用的一些库将parquet文件转换为scala中的csv。它们是用java编写的-https://github.com/parquet/parquet-compatibility
这段代码很好用,大部分情况下我都很满意,除了writegroup代码,它基本上是从上面链接的lib复制过来的。对于大文件(如800MBParquet),其速度非常慢(超过一小时),内存使用率非常高(Java8Gig,IntelliJ2Gig),有时cpu使用率达到100%。
有人对parquet格式有足够的了解,能够掌握如何正确地编写writegroup吗?
代码:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.example.data.Group
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetReader}
import org.apache.parquet.schema.MessageType

trait ParquetConverter {

  type FilePath = String

  def convertParquetToCsv(targetDir: FilePath)(inputFile: FilePath): Try[FilePath]
}

class ParquetConverterImpl extends ParquetConverter with LazyLogging with LogSupport {

  val CSV_DELIMITER: String = ","
  val JavaNull: Null = null //scalastyle:off

  def convertParquetToCsv(targetDir: FilePath)(inputFile: FilePath): Try[FilePath] = {

    logger.info(s"Converting $inputFile to CSV")

    val hadoopConfiguration: Configuration = new Configuration(true)
    val groupReadSupport: GroupReadSupport = new GroupReadSupport

    def parquetFilePath: Try[Path] = LogTry { new Path(inputFile) }
    def footerMetaData(hadoopConfiguration: Configuration, parquetFilePath: Path): Try[ParquetMetadata] = LogTry { ParquetFileReader.readFooter(hadoopConfiguration, parquetFilePath) }
    def bufferedWriter(csvOutputFile: File): Try[BufferedWriter] = LogTry { new BufferedWriter(new FileWriter(csvOutputFile)) }
    def parquetReader(parquetFilePath: Path, groupReadSupport: GroupReadSupport): Try[ParquetReader[Group]] = LogTry { new ParquetReader[Group](parquetFilePath, groupReadSupport) }

    def csvGroups(parquetReader: ParquetReader[Group]): Try[Stream[Group]] = LogTry { Stream.continually(parquetReader.read()).takeWhile(_ != JavaNull) }

    for {
      parquetFilePath <- parquetFilePath
      footerMetaData <- footerMetaData(hadoopConfiguration, parquetFilePath)
      schema = footerMetaData.getFileMetaData.getSchema
      _ = groupReadSupport.init(hadoopConfiguration, JavaNull, schema)
      csvOutputFile <- getOutputFilePath(targetDir, parquetFilePath)
      bufferedWriter <- bufferedWriter(csvOutputFile)
      _ <- writeInitialHeaders(schema, bufferedWriter)
      parquetReader <- parquetReader(parquetFilePath, groupReadSupport)
      csvGroups <- csvGroups(parquetReader)
      _ <- writeGroups (bufferedWriter, csvGroups, schema)
      _ <- closeReader(parquetReader)
      _ <- closeWriter(bufferedWriter)
    } yield { csvOutputFile.getAbsolutePath }
  }

  private def LogTry[A](computation: => A): Try[A] = {
    Try(computation) recoverWith {
      case e: Throwable =>
        logger.error(e.getMessage)
        Failure(e)
    }
  }

  private def writeGroups(bufferedWriter: BufferedWriter, groups: Seq[Group], schema: MessageType): Try[Unit] = {
    LogTry {
      groups.foreach(group => writeGroup(bufferedWriter, group, schema))
    }
  }

  private def closeWriter(bufferedWriter: BufferedWriter): Try[Unit] = {
    LogTry {
      bufferedWriter.close()
    }
  }

  private def closeReader(parquetReader: ParquetReader[Group]): Try[Unit] = {
    LogTry {
      parquetReader.close()
    }
  }

  private def writeInitialHeaders(schema: MessageType, bufferedWriter: BufferedWriter): Try[Unit] = {
    import scala.collection.JavaConverters._
    LogTry {
      schema.getFields.asScala.foreach(fieldType => bufferedWriter.write(fieldType.getName + CSV_DELIMITER))
      bufferedWriter.write("\n")
    }
  }

下面的writegroup有很多问题。它是java的复制版,没有远程功能。我在编写本文时发现,grouptowrite.getvaluetostring(j,0)将不将任何值视为null,捕获该值并将其 Package 到运行时异常中。但是如果模式中有可选值,那么没有值是完全有效的,因此catch runtimeexception特别糟糕。
我怀疑这一切都是异常捕获,尤其是被 Package 在logtry函数中,这导致了缩放问题(对于小于100兆的文件,它可以正常工作)。

private def writeGroup(bufferedWriter: BufferedWriter, groupToWrite: Group, schema: MessageType): Unit = {

    for (j <- 0 to schema.getFieldCount) {
      if (j > 0) bufferedWriter.write(CSV_DELIMITER) //IOException

      try {
        val valueToString = groupToWrite.getValueToString(j, 0) //RuntimeException
        bufferedWriter.write(valueToString) //IOException
      } catch {
        case _: RuntimeException =>
          bufferedWriter.write(""    )
      }
    }
    bufferedWriter.write('\n')
  }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题