合并到deltalake表更新所有行

6ju8rftf  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(414)

我正在尝试使用sparkDataframe更新deltalake表。我要做的是更新sparkDataframe中与deltalake表中不同的所有行,并插入deltalake表中缺少的所有行。
我试着这样做:

import io.delta.tables._

val not_equal_string = df.schema.fieldNames.map(fn => 
    s"coalesce(not ((updates.${fn} = history.${fn}) or (isnull(history.${fn}) and isnull(updates.${fn})) ),false)"
    ).reduceLeft((x,y) => s"$x OR $y ")

val deltaTable = DeltaTable.forPath(spark, "s3a://sparkdata/delta-table")

deltaTable.as("history").merge(
    df.as("updates"), "updates.EquipmentKey = history.EquipmentKey"
).whenMatched(not_equal_string).updateAll().whenNotMatched().insertAll().execute()

这是可行的,但当我查看生成的delta表时,我发现即使我没有更新一条记录,它的大小也实际上增加了一倍。生成了一个新的json文件,其中包含一个remove for every old partition和一个add with all new partitions。
当我只是以whenmatched条件作为where条件运行一个sql连接时,我没有得到一行。
我希望在这样的合并操作之后delta表不会被触动。我错过了一些简单的东西吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题