pyspark 如何从数据库增量文件中删除数据?

rt4zxlrg  于 2022-12-22  发布在  Spark
关注(0)|答案(4)|浏览(192)

我想从数据库中的delta文件中删除数据。
例如:

PR=spark.read.format('delta').options(header=True).load('/mnt/landing/Base_Tables/EventHistory/')
PR.write.format("delta").mode('overwrite').saveAsTable('PR')
spark.sql('delete from PR where PR_Number=4600')

这是从表中删除数据,而不是从实际的增量文件中删除数据。我想删除文件中的数据,而不使用合并操作,因为连接条件不匹配。有人能帮助我解决这个问题吗?
谢啦,谢啦

nfs0ujit

nfs0ujit1#

请记住:Delta中的DELETE不支持子查询。
问题链接:https://github.com/delta-io/delta/issues/730
从文档本身来看,备选方案如下
例如:

DELETE FROM tdelta.productreferencedby_delta 
WHERE  id IN (SELECT KEY 
              FROM   delta.productreferencedby_delta_dup_keys) 
       AND srcloaddate <= '2020-04-15'

在DELTA的情况下,可写为

MERGE INTO delta.productreferencedby_delta AS d 
using (SELECT KEY FROM   tdatamodel_delta.productreferencedby_delta_dup_keys) AS k 
ON d.id = k.KEY 
  AND d.srcloaddate <= '2020-04-15' 
WHEN MATCHED THEN DELETE
6za6bjd0

6za6bjd02#

您可以从Delta表www.example.com中移除与 predicate 匹配的数据https://docs.delta.io/latest/delta-update.html#delete-from-a-table

eblbsuwk

eblbsuwk3#

就像

delete from delta.`/mnt/landing/Base_Tables/EventHistory/` where PR_Number=4600
pkbketx9

pkbketx94#

使用Spark SQL函数可以:

dt_path = "/mnt/landing/Base_Tables/EventHistory/"
my_dt = DeltaTable.forPath(spark, dt_path)
seq_keys = ["4600"] // You could add here as many as you want
my_dt.delete(col("PR_Number").isin(seq_keys))

在scala中:

val dt_path = "/mnt/landing/Base_Tables/EventHistory/"
val my_dt : DeltaTable = DeltaTable.forPath(spark, dt_path)
val seq_keys = Seq("4600") // You could add here as many as you want
my_dt.delete(col("PR_Number").isin(seq_keys:_*))

相关问题