pyspark 如何确认insertInto是否利用动态分区覆盖?

6yoyoihd  于 2023-03-07  发布在  Spark
关注(0)|答案(1)|浏览(210)

我有下面的可重用函数,它使用saveAsTable处理初始加载和使用insertInto进行增量更新。insertInto还通过将选项partitionOverwriteMode设置为dynamic来使用 * 动态分区更新 * 特性。我的问题是,是否有办法在Spark UI中检查该特性是否正在使用。我认为下面DAG中的number of dynamic part指示了这一点,但我不确定。

def saveAsManagedTable(df: DataFrame, dbOrSchemaName: String, tableName: String, partitionColNames: Array[String],
                         comment: String, numPartitions: Int = -1, saveMode: SaveMode = SaveMode.Overwrite,
                         fileFormat: StorageFormat = StorageFormat.Delta, otherWriterOpts: Map[String, String] = Map.empty): Unit = {
    //Check pre-conditions
    require((fileFormat.equals(StorageFormat.Delta) || fileFormat.equals(StorageFormat.Parquet)), "StorageFormat can be one of Delta or Parquet.")

    val spark = SynapseSpark.getActiveSession

    val fullTableName = s"$dbOrSchemaName.$tableName"
    println(
      s"""Saving as : $fullTableName, at location: ${spark.conf.get("spark.sql.warehouse.dir")},
         | with partition cols: ${partitionColNames.mkString(",")} and save mode: $saveMode""".stripMargin)
    
    if (!spark.catalog.databaseExists(dbOrSchemaName)) {
      spark.sql(
        s"""CREATE DATABASE IF NOT EXISTS $dbOrSchemaName
           |COMMENT '$comment'
           |)
           |""".stripMargin)
    }
    val tempDf = Partitions.selectCoalesceOrRepartition(df, numPartitions)

    if (!spark.catalog.tableExists(fullTableName)) {
      println(s"$fullTableName doesn't exist in catalog. Creating it.")
      tempDf
        .write
        .partitionBy(partitionColNames: _*)
        .format(fileFormat.format)
        .options(otherWriterOpts)
        .option("encoding", StandardCharsets.UTF_8.name())
        .mode(saveMode)
        .saveAsTable(fullTableName)
    }
    else {
      //Raises "AnalysisException: Table not found." if the table already doesn't exist
     println(s"$fullTableName already exists in catalog. Appending to it.")
      tempDf
        .write
        //.partitionBy(partitionCol) Not required as it retrieves the partition info from catalog
        .format(fileFormat.format)
        .options(otherWriterOpts)
        .option("partitionOverwriteMode", "dynamic") //The dynamic value makes sure that Spark will overwrite only partitions that we have data for in our DataFrame.
        .option("encoding", StandardCharsets.UTF_8.name())
        .mode(saveMode) //mode is not needed as it is "Append" by default
        .insertInto(fullTableName)
    }
  }
n9vozmp4

n9vozmp41#

您可以使用Spark UI中的Environment选项卡轻松查看是否启用了动态分区覆盖:

然后在Spark Properties中搜索所需的conf:

    • 更新**

我担心你不能在write.options()中使用conf。如果你想在spark用户界面中看到它,Spark使用传递给. write.options()方法的选项来配置DataFrame或Dataset的写操作,但它们不会影响Spark的运行时或环境,比如配置设置。
唯一的解决方案是在写入之前使用spark.conf.set("partitionOverwriteMode","dynamic")设置它,或者如果您想在任何地方使用它,则将其添加到spark-default.conf文件中

相关问题