如何在Azure Synapse Analytics中获取完整的文件路径并将它们路径到每个活动?

goucqfw6  于 2023-05-01  发布在  其他
关注(0)|答案(1)|浏览(122)

我需要处理ADLS Gen2中包含的所有数据。数据以Parquet格式存储,而不是存储在数据库中,文件夹结构如下所示。

该过程对所有文件都是相同的,但必须在同一位置覆盖已处理的文件。我打算通过以下方式实现这一目标:
1.使用“获取元数据”活动获取所有文件的完整文件路径。
1.在将文件路径逐个传递给foreach Activity之后,文件被逐个处理并存储在其原始位置。
但是,“获取元数据”活动无法成功检索文件路径,因为源数据集中不允许使用通配符。如何才能做到这一点?

tf7tbtn2

tf7tbtn21#

获取 meta数据活动不会给予完整的文件夹路径,即使您定义了通配符占位符。

如果你有更少的行数在整体上,你可以使用复制活动附加列的文件路径和合并所有文件,并将其保存在一个临时位置。稍后使用lookup activity,获取该列,您可以使用它的并集获得唯一列表。但是查找不会给予超过5000行的输出。
所以,我尝试了下面的方法来获取数组中的所有文件路径。
首先,我使用查找活动SQL脚本生成了文件夹路径数组(带日期)
如果不想使用SQL数据库脚本,可以使用until activity,通过初始化开始日期和结束日期来获取日期的文件夹路径列表。
首先,通过文件夹路径,确定文件夹的开始日期结束日期。将这些值赋给像这样的参数。
下面是我的文件夹结构:

inputdata
    year=2023
        month=04
            day=20
                File01.parquet
            day=21
                File01.parquet
                File02.parquet
            day=22
                File01.parquet
                File02.parquet
            day=23
                File01.parquet
                File02.parquet
            day=24
                File01.parquet
                File02.parquet

根据我的文件夹结构,2023/04/202023/04/24分别是开始日期和结束日期。

我已经使用以下SLQ脚本从这个SO answer由@Joseph Xu在查找中获得文件夹路径数组。

;with dtable as 
(
    select CONVERT(varchar(100),'@{pipeline().parameters.folders_s_date}', 111) as mydate
union all 
    select CONVERT(varchar(100), DATEADD(day,1,mydate), 111) from dtable
    where datediff(day,CONVERT(varchar(100), DATEADD(day,1,mydate), 111),'@{pipeline().parameters.folders_end_date}')>=0
) 

select concat('inputdata/year=',year(mydate),'/month=',PARSENAME( REPLACE(mydate,'/','.'),2),'/day=',PARSENAME( REPLACE(mydate,'/','.'),1)) as path from dtable option(MAXRECURSION 0)

这将给予如下的文件夹路径数组:

现在,将此数组传递给ForEach,并在ForEach内部使用带有parquet数据集的Get meta数据活动。使用数据集参数,在Get metadata活动中给出@item().path以获取子项(parquet文件)。
这里,ADF目前不支持嵌套for Each,所以为了迭代子项,我使用了另一个带有Execute pipeline活动的管道,并将这些子项数组传递给它。
在子管道中,将子项文件名附加到文件夹路径并生成一个数组。将这个数组与_连接,并将结果作为字符串返回。

在执行管道活动之后,我将返回结果与如下所示的字符串变量连接在一起。

在ForEach之外,我已经使用split将**字符串以_**分隔成一个数组并存储在变量中。

父管道JSON:

{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "Lookup to get dates structure",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "AzureSqlSource",
                        "sqlReaderQuery": {
                            "value": ";with dtable as \n(\n    select CONVERT(varchar(100),'@{pipeline().parameters.folders_s_date}', 111) as mydate\nunion all \n    select CONVERT(varchar(100), DATEADD(day,1,mydate), 111) from dtable\n    where datediff(day,CONVERT(varchar(100), DATEADD(day,1,mydate), 111),'@{pipeline().parameters.folders_end_date}')>=0\n) \n\nselect concat('inputdata/year=',year(mydate),'/month=',PARSENAME( REPLACE(mydate,'/','.'),2),'/day=',PARSENAME( REPLACE(mydate,'/','.'),1)) as path from dtable option(MAXRECURSION 0)\n",
                            "type": "Expression"
                        },
                        "queryTimeout": "02:00:00",
                        "partitionOption": "None"
                    },
                    "dataset": {
                        "referenceName": "SQLdataset",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "Itertate through folder paths",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "Lookup to get dates structure",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@activity('Lookup to get dates structure').output.value",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Get Metadata1",
                            "type": "GetMetadata",
                            "dependsOn": [],
                            "policy": {
                                "timeout": "0.12:00:00",
                                "retry": 0,
                                "retryIntervalInSeconds": 30,
                                "secureOutput": false,
                                "secureInput": false
                            },
                            "userProperties": [],
                            "typeProperties": {
                                "dataset": {
                                    "referenceName": "sourcefiles",
                                    "type": "DatasetReference",
                                    "parameters": {
                                        "folderpath": {
                                            "value": "@item().path",
                                            "type": "Expression"
                                        }
                                    }
                                },
                                "fieldList": [
                                    "childItems"
                                ],
                                "storeSettings": {
                                    "type": "AzureBlobFSReadSettings",
                                    "enablePartitionDiscovery": false
                                },
                                "formatSettings": {
                                    "type": "ParquetReadSettings"
                                }
                            }
                        },
                        {
                            "name": "Child pipeline for file paths",
                            "type": "ExecutePipeline",
                            "dependsOn": [
                                {
                                    "activity": "Get Metadata1",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "pipeline": {
                                    "referenceName": "child",
                                    "type": "PipelineReference"
                                },
                                "waitOnCompletion": true,
                                "parameters": {
                                    "folderpath": {
                                        "value": "@item().path",
                                        "type": "Expression"
                                    },
                                    "childitems": {
                                        "value": "@activity('Get Metadata1').output.childItems",
                                        "type": "Expression"
                                    }
                                }
                            }
                        },
                        {
                            "name": "store return and concat in temp",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "Child pipeline for file paths",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "temp",
                                "value": {
                                    "value": "@concat(variables('paths_string'),'_',activity('Child pipeline for file paths').output.pipelineReturnValue.returns)",
                                    "type": "Expression"
                                }
                            }
                        },
                        {
                            "name": "assign temp to path strings",
                            "type": "SetVariable",
                            "dependsOn": [
                                {
                                    "activity": "store return and concat in temp",
                                    "dependencyConditions": [
                                        "Succeeded"
                                    ]
                                }
                            ],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "paths_string",
                                "value": {
                                    "value": "@variables('temp')",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "split paths to array",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "Itertate through folder paths",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "final_paths",
                    "value": {
                        "value": "@skip(split(variables('paths_string'),'_'),1)",
                        "type": "Expression"
                    }
                }
            }
        ],
        "parameters": {
            "folders_s_date": {
                "type": "string",
                "defaultValue": "2023/04/20"
            },
            "folders_end_date": {
                "type": "string",
                "defaultValue": "2023/04/24"
            }
        },
        "variables": {
            "final_paths": {
                "type": "Array"
            },
            "temp": {
                "type": "String"
            },
            "paths_string": {
                "type": "String"
            }
        },
        "annotations": []
    }
}

子管道JSON:

{
    "name": "child",
    "properties": {
        "activities": [
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@pipeline().parameters.childitems",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "Append variable1",
                            "type": "AppendVariable",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "variableName": "child_paths",
                                "value": {
                                    "value": "@concat(pipeline().parameters.folderpath,'/',item().name)",
                                    "type": "Expression"
                                }
                            }
                        }
                    ]
                }
            },
            {
                "name": "Set variable1",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "pipelineReturnValue",
                    "value": [
                        {
                            "key": "returns",
                            "value": {
                                "type": "Expression",
                                "content": "@join(variables('child_paths'),'_')"
                            }
                        }
                    ],
                    "setSystemVariable": true
                }
            }
        ],
        "parameters": {
            "folderpath": {
                "type": "string"
            },
            "childitems": {
                "type": "array"
            }
        },
        "variables": {
            "child_paths": {
                "type": "Array"
            }
        },
        "annotations": []
    }
}

路径数组:

您可以传递此数组For Each以实现您的要求。

相关问题