spark/hadoop对大型lzo文件引发异常

slwdgvem  于 2021-06-04  发布在  Hadoop
关注(0)|答案(3)|浏览(580)

我正在s3中存储的lzo压缩日志文件上运行emr spark作业。同一文件夹中存储了多个日志文件,例如:

...
s3://mylogfiles/2014-08-11-00111.lzo
s3://mylogfiles/2014-08-11-00112.lzo
...

在sparkshell中,我运行一个计算文件行数的作业。如果我为每个文件分别计算行数,则没有问题,例如:

// Works fine
...
sc.textFile("s3://mylogfiles/2014-08-11-00111.lzo").count()
sc.textFile("s3://mylogfiles/2014-08-11-00112.lzo").count()
...

如果使用通配符以一行代码加载所有文件,则会出现两种异常。

// One-liner throws exceptions
sc.textFile("s3://mylogfiles/*.lzo").count()

例外情况如下:

java.lang.InternalError: lzo1x_decompress_safe returned: -6
    at com.hadoop.compression.lzo.LzoDecompressor.decompressBytesDirect(Native Method)

java.io.IOException: Compressed length 1362309683 exceeds max block size 67108864 (probably corrupt file)
    at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:291)

在我看来,除了最后一个例外,所给的文本暗示了解决办法,但我不知道如何继续。lzo文件允许有多大是有限制的,还是有什么问题?
我的问题是:我可以运行spark查询来加载s3文件夹中的所有lzo压缩文件,而不产生与i/o相关的异常吗?
每个文件大约有66个200mb的文件。
编辑:只有在使用hadoop2核心libs(ami3.1.0)运行spark时才会发生异常。使用hadoop1核心libs(ami2.4.5)运行时,一切正常。这两个案例都用spark1.0.1进行了测试。

uurv41yg

uurv41yg1#

昨天,我们在一个emr集群上部署了hive,并且在s3中的一些lzo文件出现了相同的问题,这些文件被另一个非emr集群毫无问题地获取。在对日志进行了一些挖掘之后,我注意到map任务以250mb的块读取s3文件,尽管这些文件肯定是不可拆分的。
原来参数mapreduce.input.fileinputformat.split.maxsize设置为250000000~250mb。这导致lzo从文件中打开一个流,最终导致lzo块损坏。
我将参数mapreduce.input.fileinputformat.split.maxsize=2000000000设置为输入数据的最大文件大小,现在一切正常。
我不确定这与spark到底有什么关系,但是更改inputformat可能会有所帮助,这看起来首先是个问题,正如amazon emr hive与apache hive的区别中所提到的那样。

ndh0cuux

ndh0cuux2#

kgeyti的答案很好,但是: LzoTextInputFormat 引入性能影响,因为它检查每个lzo文件的.index文件。这对于s3上的许多lzo文件来说尤其痛苦(我经历了长达几分钟的延迟,这是由对s3的数千个请求造成的)。
如果您事先知道lzo文件是不可拆分的,那么一个更有效的解决方案是创建一个自定义的、不可拆分的输入格式:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

class NonSplittableTextInputFormat extends TextInputFormat {
    override def isSplitable(context: JobContext, file: Path): Boolean = false
}

然后像这样读文件:

context.newAPIHadoopFile("s3://mylogfiles/*.lzo",
  classOf[NonSplittableTextInputFormat],
  classOf[org.apache.hadoop.io.LongWritable],
  classOf[org.apache.hadoop.io.Text])
.map(_._2.toString)
ukdjmx9f

ukdjmx9f3#

我自己也没有遇到过这个问题,但看起来 .textFile 期望文件是可拆分的,就像cedrik的问题:hive坚持使用 CombineFileInputFormat 您可以索引lzo文件,或者尝试使用 LzoTextInputFormat -我很想知道这对emr是否更有效:

sc.newAPIHadoopFile("s3://mylogfiles/*.lz", 
    classOf[com.hadoop.mapreduce.LzoTextInputFormat],
    classOf[org.apache.hadoop.io.LongWritable],
    classOf[org.apache.hadoop.io.Text])
  .map(_._2.toString) // if you just want a RDD[String] without writing a new InputFormat
  .count

相关问题