我尝试了下面我能想到的最佳方法,利用指定的主键在现有数据(targetdf)上向上插入新数据(在sourcedf中)。
val primaryKeyList = List("header1")
val primaryKeyRowsToDropFromTarget =
targetDF.select(primaryKeyList.head, primaryKeyList.tail:_*)
.intersect(sourceDF.select(primaryKeyList.head, primaryKeyList.tail:_*))
.withColumn("dropColumnFlag",lit("yes"))
val upsertedDF =
targetDF.join(primaryKeyRowsToDropFromTarget , primaryKeyList.toSeq , "left_outer")
.filter("dropColumnFlag is null").drop("dropColumnFlag")
.union(sourceDF)
上述方法遵循以下步骤顺序:
标识主键列或列的组合以获取复合键。
标识要从主键列与sourcedf匹配的targetdf中删除的行。并在这些行中附加一个标志“yes”。
在targetdf上使用left outer join来用将被更新的标志标记这些行。
过滤掉不更新的未标记的targetdf行并删除flag列。
最后,将筛选出的targetdf数据与sourcedf数据合并。
以下示例将显示sourcedf、targetdf和预期upserteddf:
targetDF:
header1,header2,header3
1,A,10
2,B,14
3,C,17
4,D,32
5,E,232
sourceDF
header1,header2,header3
6,F,102
7,G,141
8,H,175
9,J,103
10,L,141
3,X,112
1,Z,90
upsertedDF
header1,header2,header3
2,B,14
4,D,32
5,E,232
6,F,102
7,G,141
8,H,175
9,J,103
10,L,141
3,X,112
1,Z,90
是否有优化的方法或逻辑来达到相同的结果?我将处理超过10-15gbs量级的数据来应用这个逻辑。任何帮助都将不胜感激。
1条答案
按热度按时间y4ekin9u1#
尝试
except
,left_anti
以及unionAll
功能。例子: