scala—处理sparkDataframe中的数组[字节]

gjmwrych  于 2021-05-31  发布在  Hadoop
关注(0)|答案(0)|浏览(208)

我有一个Dataframedf1,模式如下:

scala> df1.printSchema
root
 |-- filecontent: binary (nullable = true)
 |-- filename: string (nullable = true)

df有文件名及其内容。内容已压缩。我可以使用下面这样的方法来解压filecontent中的数据并将其保存到hdfs中。

def decompressor(origRow: Row) = {
    val filename = origRow.getString(1)
    val filecontent = serialise(origRow.getString(0))

    val unzippedData = new GZIPInputStream(new ByteArrayInputStream(filecontent))

    val hadoop_fs = FileSystem.get(sc.hadoopConfiguration)
    val filenamePath = new Path(filename)

    val fos = hadoop_fs.create(filenamePath)

    org.apache.hadoop.io.IOUtils.copyBytes(unzippedData, fos, sc.hadoopConfiguration)
    fos.close()
  }

我的目标:
因为df1中的filecontent列数据是二进制的,即数组[byte],所以我不应该将数据分发到一起,并将其传递给函数,以便它可以解压并将其保存到文件中。
我的问题是:
如何不分发数据(列数据)?
如何确保一次处理一行?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题