MAX(LastModifytime) as NewWatermarkvalue from data_source_table;
然后在复制活动源和接收器中采取如下图所示的图像 资料来源: 源中的
查询:
select `* from data_source_table where LastModifytime > '@{activity('Lookup1').output.firstRow.WatermarkValue}' and LastModifytime <= '@{activity('Lookup1').output.firstRow.Watermarkvalue}'
1条答案
按热度按时间whlutmcx1#
请参考此链接。使用此链接,您可以从sql中获取新添加的行到数据湖存储中。从我这边重现问题,并能够从管道中获取新添加的记录。
1.在sql存储中创建了两个表,名称分别为data_source_table和watermarktable。
1.创建管道如下所示,
在lookup1中选择数据源表
在lookup2中,按如下所示选择Query
然后在复制活动源和接收器中采取如下图所示的图像
资料来源:
源中的
查询:
Flume:
管道已成功运行,sql表中的数据已加载到数据湖存储文件中。
已在data_source_table中插入新行,并且能够从查找活动
中获取这些记录