Apache Spark 将数据框传递给函数或将新变量分配给数据框会导致数据复制吗?

dldeef67  于 2023-05-18  发布在  Apache
关注(0)|答案(1)|浏览(179)

举个例子

def transform(myDF: DataFrame) (implicit spark: SparkSession): DataFrame = {

     // do something

  }

  val myDf1 =  spark.read.format("avro").load("hdfs://myavrofile")
  val myDf2 = transform(myDf1)

上面的代码是不是效率很低?因为首先我们必须将数据复制到函数中,并将数据重新分配到新的dataframe中。
或者spark内部优化,使dataframe只是一个指针。

ubbxdtey

ubbxdtey1#

再加上Hamza的回答,如果你想从它创建一个分支(例如val df1 = transform1(df); val df2 = transform2(df)),缓存父 Dataframe 是一个好习惯。因为spark创建了一个沿袭,如果你不缓存父 Dataframe ,在每个动作中,计算都是从一开始就完成的。所以:

import org.apache.spark.sql.{DataFrame, SparkSession}

object DataFrameOptimization {

  def main(args: Array[String]): Unit = {
    implicit val spark: SparkSession = SparkSession.builder()
      .appName("DataFrameOptimization")
      .getOrCreate()

    val myDf1 = spark.read.format("avro").load("hdfs://myavrofile").cahce()
    val myDf2 = transform(myDf1, 10)
    val myDf3 = transform(myDf1, 20)

    // Perform some action on myDf2 to trigger the execution
    myDf2.show()
    myDf3.show()
  }

  def transform(myDF: DataFrame, n: Int)(implicit spark: SparkSession): DataFrame = {
    // Perform some transformation on the DataFrame
    // For example, let's filter the rows based on a condition
    myDF.filter(s"column_name >= $n")
  }
}

你可以检查缓存的效果,通过检查每个转换的 Dataframe 的explain("formatted")方法。最后,如果你的数据集很重,你可以通过调用persist而不是cache来改变StorageLevel

相关问题