MapJSON列Azure数据工厂

j2cgzkjk  于 2023-03-24  发布在  其他
关注(0)|答案(1)|浏览(148)

我正在尝试将以下请求Map到SQL数据库https://api.exchangerate.host/timeseries?start_date=2023-01-01&end_date=2023-01-10&base=USD&symbols=MXN&format=json
问题是,日期是不断变化的,我需要以某种方式Map他们与某种通配符。我不能使用这种特殊情况下的数据流。

最后,我需要两个具有预定义整数的自定义列,即日期和汇率值

lawou6xi

lawou6xi1#

最后,我需要两个具有预定义整数的自定义列,即日期和汇率值
通过使用您的请求,我能够使用split和for循环以及openjson()来完成您的需求。
问题是,日期是不断变化的,我需要以某种方式Map他们与某种通配符
由于日期是变化的,你不能动态地访问这些值。为了动态地获取它,首先我在一个web活动中接受了你的请求,并在rates对象的字符串上使用了一个拆分,其中包含':{"MXN":'和以下动态内容。
@split(string(activity('Web1').output.rates),':{"MXN":')
我把这个数组存储在一个数组变量中。

然后,我通过跳过最后一个值将这个数组传递给ForEach,并在下面的动态内容中。
@take(variables('split_arr'),sub(length(variables('split_arr')),1))
我使用这个ForEach来构建一个JSON数组,其中每个索引中的对象都是date和rate。为此,我在forEach中使用了一个追加变量activity到json_arr数组变量,其中包含以下动态内容。

@json(concat('{"date":"',substring(item(),sub(length(item()),11), 10),'","rate":',activity('Web1').output.rates[substring(item(),sub(length(item()),11), 10)].MXN,'}'))

它将给予如下所示的JSON数组。

使用这个JSON数组和SQL作为源来复制活动,并在查询中使用openjson(),如下所示。

declare @json nvarchar(max) = N'@{variables('json_arr')}';
SELECT date,rate FROM OPENJSON(@json) WITH (
date varchar(max),
rate decimal(8,6)
);

在复制活动的接收器中,根据您的要求给予您的接收器。在这里,我使用了一个SQL表。

以下是我的pipeline JSON,供大家参考:

{
"name": "pipeline2",
"properties": {
    "activities": [
        {
            "name": "Web1",
            "type": "WebActivity",
            "dependsOn": [],
            "policy": {
                "timeout": "0.12:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "url": "https://api.exchangerate.host/timeseries?start_date=2023-01-01&end_date=2023-01-10&base=USD&symbols=MXN&format=json",
                "method": "GET"
            }
        },
        {
            "name": "Set variable1",
            "type": "SetVariable",
            "dependsOn": [
                {
                    "activity": "Web1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "userProperties": [],
            "typeProperties": {
                "variableName": "split_arr",
                "value": {
                    "value": "@split(string(activity('Web1').output.rates),':{\"MXN\":')",
                    "type": "Expression"
                }
            }
        },
        {
            "name": "ForEach1",
            "type": "ForEach",
            "dependsOn": [
                {
                    "activity": "Set variable1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "userProperties": [],
            "typeProperties": {
                "items": {
                    "value": "@take(variables('split_arr'),sub(length(variables('split_arr')),1))",
                    "type": "Expression"
                },
                "isSequential": true,
                "activities": [
                    {
                        "name": "Append variable1",
                        "type": "AppendVariable",
                        "dependsOn": [],
                        "userProperties": [],
                        "typeProperties": {
                            "variableName": "json_arr",
                            "value": {
                                "value": "@json(concat('{\"date\":\"',substring(item(),sub(length(item()),11), 10),'\",\"rate\":',activity('Web1').output.rates[substring(item(),sub(length(item()),11), 10)].MXN,'}'))",
                                "type": "Expression"
                            }
                        }
                    }
                ]
            }
        },
        {
            "name": "Set variable2",
            "type": "SetVariable",
            "dependsOn": [
                {
                    "activity": "ForEach1",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "userProperties": [],
            "typeProperties": {
                "variableName": "json_arr1",
                "value": {
                    "value": "@variables('json_arr')",
                    "type": "Expression"
                }
            }
        },
        {
            "name": "Copy data1",
            "type": "Copy",
            "dependsOn": [
                {
                    "activity": "Set variable2",
                    "dependencyConditions": [
                        "Succeeded"
                    ]
                }
            ],
            "policy": {
                "timeout": "0.12:00:00",
                "retry": 0,
                "retryIntervalInSeconds": 30,
                "secureOutput": false,
                "secureInput": false
            },
            "userProperties": [],
            "typeProperties": {
                "source": {
                    "type": "AzureSqlSource",
                    "sqlReaderQuery": {
                        "value": "declare @json nvarchar(max) = N'@{variables('json_arr')}';\n\nSELECT date,rate FROM OPENJSON(@json) WITH (\n   date varchar(max),\n\trate decimal(8,6)\n);",
                        "type": "Expression"
                    },
                    "queryTimeout": "02:00:00",
                    "partitionOption": "None"
                },
                "sink": {
                    "type": "AzureSqlSink",
                    "writeBehavior": "insert",
                    "sqlWriterUseTableLock": false,
                    "tableOption": "autoCreate",
                    "disableMetricsCollection": false
                },
                "enableStaging": false,
                "translator": {
                    "type": "TabularTranslator",
                    "typeConversion": true,
                    "typeConversionSettings": {
                        "allowDataTruncation": true,
                        "treatBooleanAsNumber": false
                    }
                }
            },
            "inputs": [
                {
                    "referenceName": "AzureSqlTable1",
                    "type": "DatasetReference"
                }
            ],
            "outputs": [
                {
                    "referenceName": "targetsql",
                    "type": "DatasetReference"
                }
            ]
        }
    ],
    "variables": {
        "split_arr": {
            "type": "Array"
        },
        "json_arr": {
            "type": "Array"
        },
        "json_arr1": {
            "type": "Array"
        }
    },
    "annotations": [],
    "lastPublishTime": "2023-03-20T16:55:11Z"
},
"type": "Microsoft.DataFactory/factories/pipelines"
}

结果:

相关问题