如何通过spark打开/stream.zip文件?

von4xj4u  于 2021-06-03  发布在  Hadoop
关注(0)|答案(6)|浏览(414)

我有zip文件,我想打开'通过'Spark。我可以打开.gzip文件没有问题,因为hadoops原生编解码器的支持,但我不能这样做与.zip文件。
有没有一种简单的方法来读取spark代码中的zip文件?我还搜索了要添加到compressioncodecfactory的zip编解码器实现,但到目前为止没有成功。

pgccezyw

pgccezyw1#

请尝试以下代码:

using API sparkContext.newAPIHadoopRDD(
    hadoopConf,
    InputFormat.class,
    ImmutableBytesWritable.class, Result.class)
zqdjd7g9

zqdjd7g92#

这个答案只收集了以前的知识和我分享我的经验。

ZipFile输入格式

我试着跟随@tinku和@jeffll的答案,并使用imported ZipFileInputFormat 一起 sc.newAPIHadoopFile 应用程序编程接口。但这对我不起作用。我不知道如何将cotdp hadoop库放到我的生产集群上。我不负责安装。

紫坪铺溪

@蒂亚戈帕尔马给了一个很好的建议,但他没有完成他的答案,我挣扎了相当长的一段时间,以实际获得解压缩输出。
当我能够这样做的时候,我必须准备好所有的理论方面,你可以在我的回答中找到:https://stackoverflow.com/a/45958182/1549135
但上述答案缺少的部分是阅读 ZipEntry :

import java.util.zip.ZipInputStream;
import java.io.BufferedReader;
import java.io.InputStreamReader;   

sc.binaryFiles(path, minPartitions)
      .flatMap { case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
8cdiaqws

8cdiaqws3#

@用户3591785给我指出了正确的方向,所以我把他的答案标为正确。
要了解更多细节,我可以搜索zipfileinputformat hadoop,并找到以下链接:http://cotdp.com/2012/07/hadoop-processing-zip-files-in-mapreduce/
使用zipfileinputformat及其助手zipfilerecordreader类,我能够让spark完全打开并读取zip文件。

rdd1  = sc.newAPIHadoopFile("/Users/myname/data/compressed/target_file.ZIP", ZipFileInputFormat.class, Text.class, Text.class, new Job().getConfiguration());

结果是一个带有一个元素的Map。文件名作为键,内容作为值,所以我需要将其转换为javapairdd。我相信,如果需要的话,可以用byteswriteable替换文本,用其他东西替换arraylist,但我的目标是首先运行一些东西。

JavaPairRDD<String, String> rdd2 = rdd1.flatMapToPair(new PairFlatMapFunction<Tuple2<Text, Text>, String, String>() {

    @Override
    public Iterable<Tuple2<String, String>> call(Tuple2<Text, Text> textTextTuple2) throws Exception {
        List<Tuple2<String,String>> newList = new ArrayList<Tuple2<String, String>>();

        InputStream is = new ByteArrayInputStream(textTextTuple2._2.getBytes());
        BufferedReader br = new BufferedReader(new InputStreamReader(is, "UTF-8"));

        String line;

        while ((line = br.readLine()) != null) {

        Tuple2 newTuple = new Tuple2(line.split("\\t")[0],line);
            newList.add(newTuple);
        }
        return newList;
    }
});
kcugc4gi

kcugc4gi4#

我遇到了一个类似的问题,我用下面的代码解决了这个问题

sparkContext.binaryFiles("/pathToZipFiles/*")
.flatMap { case (zipFilePath, zipContent) =>

        val zipInputStream = new ZipInputStream(zipContent.open())

        Stream.continually(zipInputStream.getNextEntry)
        .takeWhile(_ != null)
        .flatMap { zipEntry => ??? }
    }
nvbavucw

nvbavucw5#

using API sparkContext.newAPIHadoopRDD(hadoopConf, InputFormat.class, ImmutableBytesWritable.class, Result.class)

文件名应该是pass using conf

conf=( new Job().getConfiguration())
conf.set(PROPERTY_NAME from your input formatter,"Zip file address")
sparkContext.newAPIHadoopRDD(conf, ZipFileInputFormat.class, Text.class, Text.class)

请找到 PROPERTY_NAME 从设置路径的输入格式化程序

jm81lzqq

jm81lzqq6#

python代码没有解决方案,我最近不得不阅读pyspark中的zips。在寻找如何做到这一点的时候,我遇到了一个问题。所以,希望这能帮助其他人。

import zipfile
import io

def zip_extract(x):
    in_memory_data = io.BytesIO(x[1])
    file_obj = zipfile.ZipFile(in_memory_data, "r")
    files = [i for i in file_obj.namelist()]
    return dict(zip(files, [file_obj.open(file).read() for file in files]))

zips = sc.binaryFiles("hdfs:/Testing/*.zip")
files_data = zips.map(zip_extract).collect()

在上面的代码中,我返回了一个字典,其中zip中的filename作为键,每个文件中的文本数据作为值。你可以改变它,无论你想适合你的目的。

相关问题