我正在使用OfficeApacheParquet页面上引用的一些库将parquet文件转换为scala中的csv。它们是用java编写的-https://github.com/parquet/parquet-compatibility
这段代码很好用,大部分情况下我都很满意,除了writegroup代码,它基本上是从上面链接的lib复制过来的。对于大文件(如800MBParquet),其速度非常慢(超过一小时),内存使用率非常高(Java8Gig,IntelliJ2Gig),有时cpu使用率达到100%。
有人对parquet格式有足够的了解,能够掌握如何正确地编写writegroup吗?
代码:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.example.data.Group
import org.apache.parquet.hadoop.example.GroupReadSupport
import org.apache.parquet.hadoop.metadata.ParquetMetadata
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetReader}
import org.apache.parquet.schema.MessageType
trait ParquetConverter {
type FilePath = String
def convertParquetToCsv(targetDir: FilePath)(inputFile: FilePath): Try[FilePath]
}
class ParquetConverterImpl extends ParquetConverter with LazyLogging with LogSupport {
val CSV_DELIMITER: String = ","
val JavaNull: Null = null //scalastyle:off
def convertParquetToCsv(targetDir: FilePath)(inputFile: FilePath): Try[FilePath] = {
logger.info(s"Converting $inputFile to CSV")
val hadoopConfiguration: Configuration = new Configuration(true)
val groupReadSupport: GroupReadSupport = new GroupReadSupport
def parquetFilePath: Try[Path] = LogTry { new Path(inputFile) }
def footerMetaData(hadoopConfiguration: Configuration, parquetFilePath: Path): Try[ParquetMetadata] = LogTry { ParquetFileReader.readFooter(hadoopConfiguration, parquetFilePath) }
def bufferedWriter(csvOutputFile: File): Try[BufferedWriter] = LogTry { new BufferedWriter(new FileWriter(csvOutputFile)) }
def parquetReader(parquetFilePath: Path, groupReadSupport: GroupReadSupport): Try[ParquetReader[Group]] = LogTry { new ParquetReader[Group](parquetFilePath, groupReadSupport) }
def csvGroups(parquetReader: ParquetReader[Group]): Try[Stream[Group]] = LogTry { Stream.continually(parquetReader.read()).takeWhile(_ != JavaNull) }
for {
parquetFilePath <- parquetFilePath
footerMetaData <- footerMetaData(hadoopConfiguration, parquetFilePath)
schema = footerMetaData.getFileMetaData.getSchema
_ = groupReadSupport.init(hadoopConfiguration, JavaNull, schema)
csvOutputFile <- getOutputFilePath(targetDir, parquetFilePath)
bufferedWriter <- bufferedWriter(csvOutputFile)
_ <- writeInitialHeaders(schema, bufferedWriter)
parquetReader <- parquetReader(parquetFilePath, groupReadSupport)
csvGroups <- csvGroups(parquetReader)
_ <- writeGroups (bufferedWriter, csvGroups, schema)
_ <- closeReader(parquetReader)
_ <- closeWriter(bufferedWriter)
} yield { csvOutputFile.getAbsolutePath }
}
private def LogTry[A](computation: => A): Try[A] = {
Try(computation) recoverWith {
case e: Throwable =>
logger.error(e.getMessage)
Failure(e)
}
}
private def writeGroups(bufferedWriter: BufferedWriter, groups: Seq[Group], schema: MessageType): Try[Unit] = {
LogTry {
groups.foreach(group => writeGroup(bufferedWriter, group, schema))
}
}
private def closeWriter(bufferedWriter: BufferedWriter): Try[Unit] = {
LogTry {
bufferedWriter.close()
}
}
private def closeReader(parquetReader: ParquetReader[Group]): Try[Unit] = {
LogTry {
parquetReader.close()
}
}
private def writeInitialHeaders(schema: MessageType, bufferedWriter: BufferedWriter): Try[Unit] = {
import scala.collection.JavaConverters._
LogTry {
schema.getFields.asScala.foreach(fieldType => bufferedWriter.write(fieldType.getName + CSV_DELIMITER))
bufferedWriter.write("\n")
}
}
下面的writegroup有很多问题。它是java的复制版,没有远程功能。我在编写本文时发现,grouptowrite.getvaluetostring(j,0)将不将任何值视为null,捕获该值并将其 Package 到运行时异常中。但是如果模式中有可选值,那么没有值是完全有效的,因此catch runtimeexception特别糟糕。
我怀疑这一切都是异常捕获,尤其是被 Package 在logtry函数中,这导致了缩放问题(对于小于100兆的文件,它可以正常工作)。
private def writeGroup(bufferedWriter: BufferedWriter, groupToWrite: Group, schema: MessageType): Unit = {
for (j <- 0 to schema.getFieldCount) {
if (j > 0) bufferedWriter.write(CSV_DELIMITER) //IOException
try {
val valueToString = groupToWrite.getValueToString(j, 0) //RuntimeException
bufferedWriter.write(valueToString) //IOException
} catch {
case _: RuntimeException =>
bufferedWriter.write("" )
}
}
bufferedWriter.write('\n')
}
}
暂无答案!
目前还没有任何答案,快来回答吧!