pyspark 我需要帮助来将SCD 2应用于此Azure Synapse Python代码,该代码将一个框架合并到一个增量表

rlcwz9us  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(92)

我尝试在Azure Synapse Notebook中使用python在维度表中表示SCD类型2。这就是我的代码目前的样子:

(
existing_table.alias(“old”)
   .merge(
       new_table.alias(“new”),
       “old.keycol = new.keycol”
   )
   .whenMatchedUpdate(condition = “old.CurrentRecord = true AND old.Rate <> new.rate”
       ,set = {“old.RecordEndDate” : “new.RecordStartDate”
              ,”old.CurrentRecord” : “false”
              }
       )
   .whenNotMatchedInsert(values = {“old.surrogatekey” : max(“old.surrogatekey”)+1
                                  ,”old.keycol” : “new.keycol”
                                  ,”old.Rate” : “new.Rate”
                                  ,”old.RecordStartDate” : “new.RecorStartDate”
                                  ,”old.CurrentRecord” : “true”
                                  }
       )
   .execute()

 )

我得到一个错误,说:

AnalysisException: cannot resolve old.surrogatekey in INSERT clause given colums new.keycol, new.Rate, new.RecordStartDate

我尝试做的是从表 existing_table 中引用 keycol 列。我想从***keycol***中得到最大值,加1,然后使用结果值作为合并的输入值。
我对这一切都是新的,我真的很感激任何帮助,我可以得到这个完成。太感谢了

cnwbcb6i

cnwbcb6i1#

我尝试了下面的代码,通过跟踪维度数据的历史更改并为新记录分配唯一的键值来执行SCD类型2行为。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_date
from pyspark.sql.window import Window
import pyspark.sql.functions as F
spark = SparkSession.builder.appName("SCDType2Example").getOrCreate()
existing_data = [
    (1, "John Doe", "2023-10-01", "9999-12-31"),
    (2, "Jane Smith", "2023-09-15", "9999-12-31"),
]
existing_schema = ["keycol", "name", "valid_from", "valid_to"]
existing_df = spark.createDataFrame(existing_data, schema=existing_schema)
new_data = [
    ("John Doe", "2023-10-09"),
    ("Jane Smith", "2023-10-09"),
]
new_schema = ["name", "valid_from"]
new_df = spark.createDataFrame(new_data, schema=new_schema)
max_keycol = existing_df.agg({"keycol": "max"}).collect()[0][0]
next_keycol = max_keycol + 1 if max_keycol is not None else 1
window_spec = Window.partitionBy("name").orderBy(F.col("valid_from").desc())
ranked_df = existing_df.withColumn("row_number", F.row_number().over(window_spec))
updated_existing_df = ranked_df.withColumn("valid_to", lit("2023-10-08")).where(col("row_number") == 1).drop("row_number")
new_df = new_df.withColumn("keycol", lit(next_keycol))
next_keycol += 1 
new_df = new_df.withColumn("valid_to", lit("9999-12-31"))
updated_dim_table = updated_existing_df.union(new_df.select(existing_schema))
updated_dim_table.show()

  • 在上面的代码中,使用agg函数查找existing_df中“keycol列的最大值。如果已有记录,则通过加1计算下一个可用键值;否则,它将设置为1。
  • 我创建了一个窗口规范window_spec,用于根据“valid_from”列按降序对existing_df中的行进行排序。
  • 用于标识每个“name”的最新记录。
  • 在指定窗口上使用row_number()existing_df中的行进行排序,并向DataFrame添加“row_number”列。
  • 将现有最新记录(“row_number”等于1的记录)的“valid_to”日期更新为“2023-10-08”,将其标记为不再是当前记录。
  • 通过将下一个可用键值添加到“keycol”列,为新数据分配唯一键值。
  • 将新记录的“valid_to”日期设置为“9999-12-31”,表示它们是没有过期日期的当前记录。
  • 将更新后的现有数据和新数据合并到单个DataFrameupdated_dim_table中。

相关问题