我有不断变化的数据来源。我正在通过sqoop拉取数据,但由于卷很大,我无法将其作为每日截断负载。我想附加数据,但逻辑应该是更新和插入。如果记录在源中通过删除以前的相同记录进行更新,则应在配置单元中执行相同的操作,即删除旧记录并插入/更新新记录。下面就是一个这样的例子。
一段时间后,比如说30分钟,数据更新如下:
现在,我的配置单元表拾取原始记录,并在一段时间后拾取更新的记录,但将其作为另一行插入。
我希望数据与源中的数据一样反映出来,而不覆盖我的表((建议使用Pypark代码)
请帮忙。谢谢您。
2条答案
按热度按时间oyjwcjzk1#
您可以在源中添加更多列作为last\u modified,也可以在配置单元中添加作为last\u load,并且在下次上载时,可以在(源中的id列和配置单元中的id列)和(源表的last\u modified和配置单元列的last\u update)上指定两个条件。
1zmg4dgp2#
不提供查询,但给出如何实现这一点的想法:
在源和实际配置单元表之间创建一个临时表,其中包含所有记录(插入和更新)。
要获得实际的配置单元表,请使用rank函数,例如:
注意:根据您的数据量,hive\u staging表可能会增长,因此您需要相应地添加分区/存储桶。