调用collect()时,为什么我的spark工作被卡住了?

2wnc66cl  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(542)

我是新来的。我创建了一个spark作业,它对每个用户进行一些数据处理。我要做的是获取目录中的所有文件并处理这些文件。有多个目录,并且有多个用户。
在读取了一个users目录中的文件之后,我做了一些转换,我需要对它们进行整体处理(比如根据数据删除一些重复的文件)。为此我打电话给 collect() 通过rdd。
当用10个目录运行时,它运行得很好,但是当用1000个目录运行时,它会被卡住 collect() 打电话。
我只做过局部测试。
启动Spark:

private lazy val sparkSession = SparkSession
.builder()
.appName("Custom Job")
.master("local[*]")
.getOrCreate()

读取目录并并行化:

val allDirs: Seq[String] = fs.getAllDirInPath(Configuration.inputDir)
val paths: RDD[String] = SessionWrapper.getSparkContext.parallelize(allDirs)

变换和 collect 电话:

paths.foreachPartition { partition =>
      partition.foreach { dir =>
        val dirData = readDataByDir(dir) // RDD[String]
        val transformed = doTranform(dirData) // RDD[CustomObject]
        val collectedData = tranformed.collect()
        // Do something on collected data
        writeToFile(collectedData)
      }
    }

来自卡住控制台的一些日志:

20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO FileInputFormat: Total input paths to process : 3
20/09/09 19:24:40 INFO CombineFileInputFormat: DEBUG: Terminated node allocation with : CompletedNodes: 1, size left: 65935
20/09/09 19:24:40 INFO DAGScheduler: Got job 2 (collect at MyCustomHelperWithCollectCall.scala:18) with 2 output partitions
20/09/09 19:24:40 INFO DAGScheduler: Final stage: ResultStage 2 (collect at MyCustomHelperWithCollectCall.scala:18)
20/09/09 19:24:40 INFO DAGScheduler: Parents of final stage: List()
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO DAGScheduler: Missing parents: List()
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO DAGScheduler: Submitting ResultStage 2 (MapPartitionsRDD[102] at map at MyCustomHelperWithCollectCall.scala:18), which has no missing parents
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_14 stored as values in memory (estimated size 7.2 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO SparkContext: Starting job: collect at MyCustomHelperWithCollectCall.scala:18
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_14_piece0 stored as bytes in memory (estimated size 3.5 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO BlockManagerInfo: Added broadcast_14_piece0 in memory on 192.168.31.222:55666 (size: 3.5 KiB, free: 2004.3 MiB)
20/09/09 19:24:40 INFO SparkContext: Created broadcast 14 from broadcast at DAGScheduler.scala:1200
20/09/09 19:24:40 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (MapPartitionsRDD[102] at map at MyCustomHelperWithCollectCall.scala:18) (first 15 tasks are for partitions Vector(0, 1))
20/09/09 19:24:40 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
20/09/09 19:24:40 INFO DAGScheduler: Got job 3 (collect at MyCustomHelperWithCollectCall.scala:18) with 1 output partitions
20/09/09 19:24:40 INFO DAGScheduler: Final stage: ResultStage 3 (collect at MyCustomHelperWithCollectCall.scala:18)
20/09/09 19:24:40 INFO DAGScheduler: Parents of final stage: List()
20/09/09 19:24:40 INFO DAGScheduler: Missing parents: List()
20/09/09 19:24:40 INFO DAGScheduler: Submitting ResultStage 3 (MapPartitionsRDD[96] at map at MyCustomHelperWithCollectCall.scala:18), which has no missing parents
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_15 stored as values in memory (estimated size 7.2 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO MemoryStore: Block broadcast_15_piece0 stored as bytes in memory (estimated size 3.5 KiB, free 2001.0 MiB)
20/09/09 19:24:40 INFO BlockManagerInfo: Added broadcast_15_piece0 in memory on 192.168.31.222:55666 (size: 3.5 KiB, free: 2004.3 MiB)
20/09/09 19:24:40 INFO SparkContext: Created broadcast 15 from broadcast at DAGScheduler.scala:1200
20/09/09 19:24:40 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 3 (MapPartitionsRDD[96] at map at MyCustomHelperWithCollectCall.scala:18) (first 15 tasks are for partitions Vector(0))
20/09/09 19:24:40 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
20/09/09 19:24:40 INFO DAGScheduler: Got job 4 (collect at MyCustomHelperWithCollectCall.scala:18) with 1 output partitions
20/09/09 19:24:40 INFO DAGScheduler: Final stage: ResultStage 4 (collect at MyCustomHelperWithCollectCall.scala:18)
20/09/09 19:24:40 INFO DAGScheduler: Parents of final stage: List()
20/09/09 19:24:40 INFO DAGScheduler: Missing parents: List()
20/09/09 19:24:40 INFO DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[101] at map at MyCustomHelperWithCollectCall.scala:18), which has no missing parents
20/09/09 19:24:40 INFO FileInputFormat: Total input paths to process : 5

请帮帮我!

bmvo0sr5

bmvo0sr51#

collect(action)-在驱动程序中将数据集的所有元素作为数组返回。这通常在过滤器或其他返回足够小的数据子集的操作之后有用
好像你的记性不太好不要打对方付费电话

df.show(100)

df.take(100)

同时更新你的spark-dsg图以了解处理过程

相关问题