为什么spark需要为它运行的每个任务序列化rdd中的数据?

hgqdbh6s  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(300)

即使有一个 .cache() d rdd,spark似乎仍然序列化每个任务运行的数据。考虑以下代码:

class LoggingSerializable() extends Externalizable {
  override def writeExternal(out: ObjectOutput): Unit = {
    println("xxx serializing")
  }

  override def readExternal(in: ObjectInput): Unit = {
    println("xxx deserializing")
  }
}

object SparkSer {
  def main(args: Array[String]) = {
    val conf = new SparkConf().setAppName("SparkSer").setMaster("local")
    val spark = new SparkContext(conf)
    val rdd: RDD[LoggingSerializable] = spark.parallelize(Seq(new LoggingSerializable())).cache()
    println("xxx done loading")
    rdd.foreach(ConstantClosure)
    println("xxx done 1")
    rdd.foreach(ConstantClosure)
    println("xxx done 2")
    spark.stop()
  }
}

object ConstantClosure extends (LoggingSerializable => Unit) with Serializable {
  def apply(t: LoggingSerializable): Unit = {
    println("xxx closure ran")
  }
}

它打印出来了

xxx done loading
xxx serializing
xxx deserializing
xxx closure ran
xxx done 1
xxx serializing
xxx deserializing
xxx closure ran
xxx done 2

尽管我打过电话 .cache()rdd ,spark仍然序列化每个调用的数据 .foreach . 官方文件说
当您持久化rdd时,每个节点将其计算的任何分区存储在内存中,并在该数据集(或从该数据集派生的数据集)上的其他操作中重用这些分区。
然后呢 MEMORY_ONLY 方法
将rdd作为反序列化的java对象存储在jvm中。
请注意,spark在序列化任务时尝试序列化数据,但是 ConstantClosure 不关闭任何内容,所以我不明白为什么需要序列化任何数据。
我之所以这么问是因为我希望能够在本地模式下运行spark而不损失任何性能,但是对于每个rdd操作,必须在rdd中序列化大型元素可能会非常昂贵。我不确定这个问题是否是本地模式独有的。spark似乎不可能通过有线将rdd中的数据发送给每个操作的工人,即使rdd被缓存了。
我用的是spark core 3.0.0。

fiei3ece

fiei3ece1#

这是因为你正在使用 parallelize . parallelize 使用特殊的rdd, ParallelCollectionRDD ,将数据放入 Partition s。 Partition 定义一个spark任务,它将被发送给spark任务中的执行者( ShuffleMapTask 或者 ResultTask ). 如果在中打印堆栈跟踪 readExternal 以及 writeExternal ,您应该能够看到它在序列化和反序列化spark任务时发生。
换句话说,数据是spark任务元数据的一部分 ParallelCollectionRDD ,spark必须发送任务以在执行器中运行,这就是序列化发生的地方。
大多数其他RDD从外部系统(如文件)读取数据,因此它们没有这种行为。

6ojccjat

6ojccjat2#

我同意这种行为看起来令人惊讶。在我的脑海里,我可能会猜测,这是因为缓存块是异步的,所有这一切发生得非常快。有可能它只是不等待缓存分区变为可用并在第二次重新计算它。
为了验证这个假设,在第二个foreach之前引入一个长时间的等待,看看这是否会改变事情。

相关问题