如何使用Logic Apps或Azure数据工厂获取Azure blob中文件的Blob索引标签

xkftehaa  于 2023-05-01  发布在  其他
关注(0)|答案(1)|浏览(129)

我试图隔离斑点(文件)使用斑点索引标签。我需要知道如何在逻辑应用程序和Azure数据工厂中执行此操作。
基于这个Blob索引标签,我需要决定是否将文件复制到目标容器。请帮帮我

kpbwa7wx

kpbwa7wx1#

在逻辑应用程序中没有直接的操作。您可以使用Azure函数,其中包含HTTP触发器,您可以在其中执行Get Blob Tags (REST API)。按照下面的步骤,我可以得到想要的结果。
下面是我使用的逻辑应用程序流程

对于每个循环内部的流,我使用XML到JSON转换来应用所需的条件,如下所示。

每个循环的for条件内部的代码视图

"expression": {
                            "and": [
                                {
                                    "not": {
                                        "equals": [
                                            "@body('Xml_to_Json_2')['Tags']['TagSet']",
                                            "@null"
                                        ]
                                    }
                                },
                                {
                                    "equals": [
                                        "@body('Xml_to_Json_2')['Tags']['TagSet']['Tag']['Value']",
                                        "sample"
                                    ]
                                },
                                {
                                    "equals": [
                                        "@body('Xml_to_Json_2')['Tags']['TagSet']['Tag']['Key']",
                                        "id"
                                    ]
                                }
                            ]
                        }

下面是我使用的示例函数代码。

import datetime
import hmac
import hashlib
import base64
import azure.functions as func
import requests

def main(req: func.HttpRequest) -> func.HttpResponse:
    name = req.params.get('name')
    if not name:
        try:
            req_body = req.get_json()
        except ValueError:
            pass
        else:
            name = req_body.get('name')
    STORAGE_ACCOUNT_NAME = '<STORAGEACCOUNTNAME>'
    STORAGE_ACCOUNT_KEY = "<STORAGEACCOUNTKEY>"
    CONTAINER_NAME = '<CONTAINERNAME>'
    REQUEST_VERSION = '2020-04-08'
    REQUEST_DATE = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
    CANONICALIZED_HEADERS = f'x-ms-date:{REQUEST_DATE}\nx-ms-version:{REQUEST_VERSION}\n'
    CANONICALIZED_RESOURCE = f'/{STORAGE_ACCOUNT_NAME}/{CONTAINER_NAME}/{name}\ncomp:tags'

    VERB = 'GET'
    Content_Encoding = ''
    Content_Language = ''
    Content_Length = ''
    Content_MD5 = ''
    Content_Type = ''
    Date = ''
    If_Modified_Since = ''
    If_Match = ''
    If_None_Match = ''
    If_Unmodified_Since = ''
    Range = ''
    CanonicalizedHeaders = CANONICALIZED_HEADERS
    CanonicalizedResource = CANONICALIZED_RESOURCE

    STRING_TO_SIGN = (VERB + '\n' + Content_Encoding + '\n' + Content_Language + '\n' +
                Content_Length + '\n' + Content_MD5 + '\n' + Content_Type + '\n' +
                Date + '\n' + If_Modified_Since + '\n' + If_Match + '\n' +
                If_None_Match + '\n' + If_Unmodified_Since + '\n' + Range + '\n' +
                CanonicalizedHeaders + CanonicalizedResource)

    signature = base64.b64encode(hmac.new(base64.b64decode(STORAGE_ACCOUNT_KEY), msg=STRING_TO_SIGN.encode('utf-8'), digestmod=hashlib.sha256).digest()).decode()
    auth_header = f'SharedKey {STORAGE_ACCOUNT_NAME}:{signature}'

    request_url = f'https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net/{CONTAINER_NAME}/{name}?comp=tags'

    request_headers = {
    'x-ms-date': REQUEST_DATE,
    'x-ms-version': REQUEST_VERSION,
    'Authorization': auth_header
    }

    response = requests.get(request_url, headers=request_headers).text
    
    return func.HttpResponse(response)

结果:

相关问题