我有下面的可重用函数,它使用saveAsTable
处理初始加载和使用insertInto
进行增量更新。insertInto
还通过将选项partitionOverwriteMode
设置为dynamic
来使用 * 动态分区更新 * 特性。我的问题是,是否有办法在Spark UI中检查该特性是否正在使用。我认为下面DAG中的number of dynamic part
指示了这一点,但我不确定。
def saveAsManagedTable(df: DataFrame, dbOrSchemaName: String, tableName: String, partitionColNames: Array[String],
comment: String, numPartitions: Int = -1, saveMode: SaveMode = SaveMode.Overwrite,
fileFormat: StorageFormat = StorageFormat.Delta, otherWriterOpts: Map[String, String] = Map.empty): Unit = {
//Check pre-conditions
require((fileFormat.equals(StorageFormat.Delta) || fileFormat.equals(StorageFormat.Parquet)), "StorageFormat can be one of Delta or Parquet.")
val spark = SynapseSpark.getActiveSession
val fullTableName = s"$dbOrSchemaName.$tableName"
println(
s"""Saving as : $fullTableName, at location: ${spark.conf.get("spark.sql.warehouse.dir")},
| with partition cols: ${partitionColNames.mkString(",")} and save mode: $saveMode""".stripMargin)
if (!spark.catalog.databaseExists(dbOrSchemaName)) {
spark.sql(
s"""CREATE DATABASE IF NOT EXISTS $dbOrSchemaName
|COMMENT '$comment'
|)
|""".stripMargin)
}
val tempDf = Partitions.selectCoalesceOrRepartition(df, numPartitions)
if (!spark.catalog.tableExists(fullTableName)) {
println(s"$fullTableName doesn't exist in catalog. Creating it.")
tempDf
.write
.partitionBy(partitionColNames: _*)
.format(fileFormat.format)
.options(otherWriterOpts)
.option("encoding", StandardCharsets.UTF_8.name())
.mode(saveMode)
.saveAsTable(fullTableName)
}
else {
//Raises "AnalysisException: Table not found." if the table already doesn't exist
println(s"$fullTableName already exists in catalog. Appending to it.")
tempDf
.write
//.partitionBy(partitionCol) Not required as it retrieves the partition info from catalog
.format(fileFormat.format)
.options(otherWriterOpts)
.option("partitionOverwriteMode", "dynamic") //The dynamic value makes sure that Spark will overwrite only partitions that we have data for in our DataFrame.
.option("encoding", StandardCharsets.UTF_8.name())
.mode(saveMode) //mode is not needed as it is "Append" by default
.insertInto(fullTableName)
}
}
1条答案
按热度按时间n9vozmp41#
您可以使用Spark UI中的Environment选项卡轻松查看是否启用了动态分区覆盖:
然后在Spark Properties中搜索所需的conf:
我担心你不能在write.options()中使用conf。如果你想在spark用户界面中看到它,Spark使用传递给. write.options()方法的选项来配置DataFrame或Dataset的写操作,但它们不会影响Spark的运行时或环境,比如配置设置。
唯一的解决方案是在写入之前使用spark.conf.set("partitionOverwriteMode","dynamic")设置它,或者如果您想在任何地方使用它,则将其添加到spark-default.conf文件中