我使用以下代码将数据写入hdfs:
def createOrAppendHadoopSnappy(path: Path, hdfs: FileSystem): CompressionOutputStream = {
val compressionCodecFactory = new CompressionCodecFactory(hdfs.getConf)
val snappyCodec = compressionCodecFactory.getCodecByClassName(classOf[org.apache.hadoop.io.compress.SnappyCodec].getName)
snappyCodec.createOutputStream(createOrAppend(path, hdfs))
}
def createOrAppend(path: Path, hdfs: FileSystem): FSDataOutputStream = {
if (hdfs.exists(path)) {
hdfs.append(path)
} else {
hdfs.create(path)
}
}
调用这个函数的代码大致是:
...
val outputStream = new BufferedOutputStream(HdfsUtils.createOrAppendHadoopSnappy(filePath, fileSystem))
...
for(managedOutputStream <- managed(outputStream)) {
IOUtils.writeLines(lines.asJavaCollection, "\n", managedOutputStream, "UTF-8")
}
...
现在,当我用“hadoop fs-text”读取一些文件时,有几个文件被以下消息损坏:java.lang.internalerror:无法解压缩数据。输入无效。
请注意,此代码在yarn上使用spark运行,由于代码中的一些更改,该作业被yarn终止了一次,我也在稍后的测试中手动终止了spark作业。
现在我想尝试重现生成损坏文件的场景,但到目前为止我没有成功。我尝试了不同的方法来中断编写(使用system.exit(0),一个异常,手动ctrl-c)。该文件未完全写入,但未给出java interalerror异常。
是否有人知道在哪些情况下会发生损坏的文件和文件,以及如何/如果可以防止它们?
暂无答案!
目前还没有任何答案,快来回答吧!