使用Azure数据工厂为Postgres DB进行增量数据复制

iszxjhcz  于 2023-05-01  发布在  其他
关注(0)|答案(1)|浏览(139)

我有一个需求,我需要将增量数据从一个postgres DB复制到另一个postgres DB。
我想我会使用水印列的方法,然后使用查找和为每一个来实现这一点,但问题来了,因为在postgres数据库汇,我没有得到一个选项upsert。
如果我的目标是postgres DB,你知道如何实现增量数据加载吗?
我创建了一个pipelien与水印的外观和使用foreach复制数据,但我不知道该怎么做时,汇是Postgress数据库,我没有upsert选项avaiable

1tu0hz3e

1tu0hz3e1#

Azure SQL数据库中没有PostgreSQL接收数据集的upsert选项。要在接收器中插入新记录,可以采用与之前相同的方法。为了更新现有记录的更改,您可以删除现有记录并将该记录插入接收器中。下面是详细的方法。

  • 源表和目标表如下图所示。

  • 创建水印表,其初始值为1900-01-01

  • 然后在ADF中,进行查找活动,并且查找活动中的源数据集用于水印表。

  • 再次进行查找活动,以从源表中获取id字段(主键列)的值。这样做是因为如果相同的id在接收器中存在,则这些行将被删除,并且针对这些id的新行将被插入。

Lookup2中的查询:

select distinct id from src_employee where created_date> '@{activity('Lookup1').output.firstRow.watermark_column}'
  • 使用设置变量活动将查找2的结果存储在字符串类型的变量v1中。

  • 使用设置变量活动来设置另一变量v2。变量v2的值为@{replace(replace(replace(replace(variables('v1'),'{"id":',''),'}',''),'[','('),']',')')}。此表达式删除字符"{"id":"}",并分别用()替换字符[]

  • 然后进行复制活动,在源数据集中,查询被给定为,
select distinct id from src_employee where created_date> '@{activity('Lookup1').output.firstRow.watermark_column}'
  • 在接收数据集设置中,将给出接收数据集,并将复制前活动写成
delete from tgt_employee where id in @{variables('v2')}
  • 然后,您可以使用基于存储水印表的数据集的查找活动或脚本活动,使用新值更新水印表。
update watermark_table
set watermark_column=(select max(created_date) from tgt_employee)

结果:

1.当流水线第一次运行时,数据从源复制到宿,并且watermark_table的watermark_column被更新为2023-04-26 00:00:00.0000000
1.然后,源表中的数据将发生更改。更新id=3的行并插入id=5的新行。

根据源在接收表中更改数据。

相关问题