pyspark 修改代码以支持使用数据块在Delta Lake表中删除

fnvucqvd  于 2023-03-28  发布在  Spark
关注(0)|答案(1)|浏览(182)

在Databricks SQL和Databricks Runtime 12.1及更高版本中,可以使用WHEN NOT MATCHED BY SOURCE子句更新或删除目标表中在源表中没有相应记录的记录。Databricks建议添加可选条件子句,以避免完全重写目标表。
下面的代码示例显示了允许更新和新行反映在目标表中。有人可以帮助修改下面的代码,以允许删除也反映在目标表中吗?我认为正确表达此要求的方法是用源表的内容覆盖目标表,并删除目标表中不匹配的记录。

try:
  #Perform a merge into the existing table
  if allowDuplicates == "true":
    (deltadf.alias("t")
       .merge(
        partdf.alias("s"),
        f"s.primary_key_hash = t.primary_key_hash")
      .whenNotMatchedInsertAll()
     .execute()
    )
  else:
    (deltadf.alias("t")
       .merge(
        partdf.alias("s"),
        "s.primary_key_hash = t.primary_key_hash")
      .whenMatchedUpdateAll("s.change_key_hash <> t.change_key_hash")
      .whenNotMatchedInsertAll().
     execute()
    )
  
  action = f"Merged into Existing Delta for table {entityName} at path: {saveloc}"

  spark.sql(f"OPTIMIZE {stageName}{regName}")  
except Exception as e:
  print(e)
3hvapo4f

3hvapo4f1#

在Python API中添加了一个新函数:.whenNotMatchedBySourceDelete()
你需要将它添加到你的代码中:

(deltadf.alias("t")
       .merge(
        partdf.alias("s"),
        "s.primary_key_hash = t.primary_key_hash")
      .whenMatchedUpdateAll("s.change_key_hash <> t.change_key_hash")
      .whenNotMatchedInsertAll()
      .whenNotMatchedBySourceDelete()
      .execute()
    )

但是你至少需要使用DBR12.1-它在较低版本中不可用

相关问题