Azure数据工厂查询Mongo DB以筛选日期

s3fp2yjn  于 2022-11-17  发布在  Go
关注(0)|答案(1)|浏览(172)

我正在从ADFv 2查询mongoDB,并尝试根据日期筛选数据。但是日期以随机数字串的形式出现。我想知道是否有一种方法可以在复制任务的源中动态筛选数据,因为我正在尝试以增量方式加载数据。

[
  {
    "_id": {
      "$oid": "5d5123dc8cf0b4453ceb2088"
    },
    "TemplateId": "5d3ac5c77eb20a2cf4bdf46a",
    "LastUpdate": {
      "$date": 1565603495299
    },
    "Answers": [
      {
        "Question": "Q001c",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "0,In person"
          ]
        }
      },
      {
        "Question": "Q001a",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "0,Yes"
          ]
        }
      },
      {
        "Question": "Q003a",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "0,Yes"
          ]
        }
      },
      {
        "Question": "Q006a",
        "Answer": {
          "_t": "System.String[]",
          "_v": [
            "1,No"
          ]

接收器/目的地已进入SQL数据库。如有任何帮助,将不胜感激。

2hh7jdfx

2hh7jdfx1#

我使用ADF重现了从MongoDb到SQL DB的增量数据加载。正如@Nick.Mc.Dermaid建议的那样,最大时间戳值应该作为增量数据加载的水印值。这里遵循的方法是复制所有日期大于水印值的数据,并用最大日期更新水印值。这样在下一次管道运行时,增量数据将被复制。下面是详细步骤。

  • 在MongoDB API中,将两个文档插入到一个容器中。x1c 0d1x
  • 然后在SQL数据库中,创建水印表,如下图所示,其中watermark value =1000000000000.

    水印的值如上所述设置,以便在第一次运行时,所有数据从源加载到接收器。
  • 在SQL数据库中编写一个存储过程,用最新的日期值更新水印表。
create proc usp_update_watermark_table as
begin
update watermark
set watermarkvalue=(select max(LastUpdate) from tgt_table)
end
  • 在ADF中,执行查找活动,并在select WatermarkValue from watermark中引用水印表

  • 在查找活动之后执行复制活动。在源数据集中,使用MongoDB API,在筛选器中,使用下面的表达式复制大于查找活动值的数据。
{"LastUpdate":{$gt:@{activity('LookupLastWaterMark').output.firstRow.WatermarkValue}}}

  • 在复制活动旁边添加存储过程活动,以便在水位标志表中更新新值x1c4d 1x

  • 在管道运行之后,输出表和水印表被更新。

  • 在源

    中添加新文档

  • 再次触发管道时,只有增量记录被加载到接收器


指令集

相关问题