如何使saveastextfile不将输出拆分为多个文件?

uhry853o  于 2021-07-12  发布在  Spark
关注(0)|答案(9)|浏览(305)

在spark中使用scala时,每当我使用 saveAsTextFile ,它似乎将输出分成多个部分。我只是给它传递一个参数(路径)。

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

输出的数量是否与它使用的减速器的数量相对应?
这是否意味着输出被压缩?
我知道我可以使用bash将输出组合在一起,但是有没有一个选项可以将输出存储在单个文本文件中,而不进行拆分??我看了api文档,但没怎么说。

jqjz2hbq

jqjz2hbq1#

它将其保存为多个文件的原因是计算是分布式的。如果输出足够小,以至于您认为可以在一台机器上安装它,那么您可以用

val arr = year.collect()

然后将结果数组保存为文件,另一种方法是使用自定义分区器, partitionBy ,并使它使所有的东西都到一个分区,尽管这是不可取的,因为你不会得到任何并行化。
如果需要保存文件 saveAsTextFile 你可以用 coalesce(1,true).saveAsTextFile() . 这基本上意味着做计算然后合并到一个分区。你也可以使用 repartition(1) 这只是一个 Package coalesce 将shuffle参数设置为true。查看rdd.scala的源代码是我如何理解大部分内容的,您应该看看。

3ks5zfa0

3ks5zfa02#

对于使用较大数据集的用户: rdd.collect() 在这种情况下不应使用,因为它将收集所有数据作为 Array 在驱动程序中,这是最简单的摆脱内存的方法。 rdd.coalesce(1).saveAsTextFile() 也不应使用,因为在单个节点上执行上游级的并行将丢失,数据将从该节点存储。 rdd.coalesce(1, shuffle = true).saveAsTextFile() 是最好的简单选择,因为它将保持上游任务的处理并行,然后只对一个节点执行洗牌( rdd.repartition(1).saveAsTextFile() 完全是同义词)。 rdd.saveAsSingleTextFile() 如下面所提供的,另外还允许将rdd存储在具有特定名称的单个文件中,同时保留 rdd.coalesce(1, shuffle = true).saveAsTextFile() .
不方便使用的东西 rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") 它实际上生成了一个路径为 path/to/file.txt/part-00000 而不是 path/to/file.txt .
以下解决方案 rdd.saveAsSingleTextFile("path/to/file.txt") 将实际生成一个路径为 path/to/file.txt :

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )
      // Working with Hadoop 3?: https://stackoverflow.com/a/50545815/9297144

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

可以这样使用:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

此代码段:
首先将rdd存储为 rdd.saveAsTextFile("path/to/file.txt") 在临时文件夹中 path/to/file.txt.tmp 好像我们不想将数据存储在一个文件中(这样可以使上游任务的处理保持并行)
然后,使用hadoop文件系统api,我们继续合并( FileUtil.copyMerge() )以创建最终输出的单个文件 path/to/file.txt .

t1qtbnec

t1qtbnec3#

你可以打电话 coalesce(1) 然后 saveAsTextFile() -但如果你有很多数据,这可能是个坏主意。每次拆分都会生成单独的文件,就像在hadoop中一样,以便让单独的Map器和还原器写入不同的文件。只有在数据非常少的情况下才有一个输出文件是个好主意,在这种情况下,还可以像@aaronman所说的那样执行collect()。

s8vozzvw

s8vozzvw4#

正如其他人提到的,您可以收集或合并数据集,以强制spark生成单个文件。但这也限制了可以并行处理数据集的spark任务的数量。我更喜欢让它在output hdfs目录中创建100个文件,然后使用 hadoop fs -getmerge /hdfs/dir /local/file.txt 将结果提取到本地文件系统中的单个文件中。当然,当您的输出是一个相对较小的报告时,这是最有意义的。

eanckbw9

eanckbw95#

你可以打电话 repartition() 按照这个方法:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

8yparm6h

8yparm6h6#

在spark的下一个版本中,您将能够做到这一点,在当前版本1.0.0中,除非您以某种方式手动完成,例如,如您所提到的,使用bash脚本调用。

zfciruhq

zfciruhq7#

我还想提到的是,文档清楚地指出,当使用真正少量的分区调用coalesce时,用户应该小心。这会导致上游分区继承这个数量的分区。
除非确实需要,否则我不建议使用coalesce(1)。

6qqygrtg

6qqygrtg8#

在spark 1.6.1中,格式如下所示。它只创建一个输出文件。最好的做法是在输出足够小而无法处理时使用它。基本上,它返回一个新的rdd,该rdd被缩减为numpartitions分区。如果要进行剧烈合并,例如,to numpartitions=1,这可能导致您的计算发生在比您喜欢的节点更少的节点上(例如,在numpartitions=1的情况下是一个节点)

pair_result.coalesce(1).saveAsTextFile("/app/data/")
z9zf31ra

z9zf31ra9#

下面是我输出单个文件的答案。我刚加了一句 coalesce(1) ```
val year = sc.textFile("apat63_99.txt")
.map(.split(",")(1))
.flatMap(
.split(","))
.map((,1))
.reduceByKey((
+)).map(.swap)
year.saveAsTextFile("year")

代码:

year.coalesce(1).saveAsTextFile("year")

相关问题