在Azure Data Factory中基于文件中的其他标头拆分CSV文件

r55awzrz  于 2023-01-18  发布在  其他
关注(0)|答案(2)|浏览(153)

我目前收到的csv文件格式如下:

Col1, Col2, Col3, Col4
Header1, , ,
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
Header2, , ,
Val1, Val2, Val3, Val4
Header3, , ,
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4

每个标题的行数可以不同,标题可以包含任何单词。
预期结果应为以下之一:选项1:将标题保存到1个文件中的附加列文件:abc/abc/完成_输出

Col1, Col2, Col3, Col4, Col5
Val1, Val2, Val3, Val4, Header1
Val1, Val2, Val3, Val4, Header1
Val1, Val2, Val3, Val4, Header1
Val1, Val2, Val3, Val4, Header2
Val1, Val2, Val3, Val4, Header3
Val1, Val2, Val3, Val4, Header3

选项2:每个标题创建不同的文件:文件1:abc/abc/标题1

Col1, Col2, Col3, Col4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4

文件2:abc/abc/标题2

Col1, Col2, Col3, Col4
Val1, Val2, Val3, Val4

文件3:abc/abc/标题3

Col1, Col2, Col3, Col4
Val1, Val2, Val3, Val4
Val1, Val2, Val3, Val4

应将文件从接收到的格式拆分到不同的文件,或者应将标题行Map到其他列。是否可以在Azure数据工厂中完成此操作(包括数据流选项)?无法访问Databricks群集。
P.S.我知道使用Python脚本很容易做到这一点,但我希望能够在ADF中构建完整的流程。
我尝试基于条件拆分来拆分文件。但是,这不起作用,因为这只允许选择行。只有当行值(之一)给出有关标题的指示时,才可以使用这一方法。
其他的东西我都用不上。
编辑:按要求添加所需的输出选项

9rygscc1

9rygscc11#

  • 您可以通过在数据工厂中使用变量、循环和条件以及复制数据活动来实现这一点。
  • 首先,使用查找活动读取源文件,该活动不带标题、随机行分隔符和列分隔符以及**,但不选择**First row as header选项(这样您将获得如下图所示的输出)。我使用;作为列分隔符,使用|作为行分隔符,并且。

  • 现在,我已经使用了多个设置变量活动,header活动是从查找输出中提取标题(col1,col2,col3,col4)。
@first(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))))

  • each file设置变量activity来存储每个文件的所有数据。我用header变量value初始化了它。

  • get first filename用于使用集合和字符串函数提取第一个文件的名称(本例中为header1)。
@replace(first(skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),1)),',,,','')

  • 现在,取剩下的数据(header1行之后),将其用作for每个循环中的items值,然后做进一步的处理。
@skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),2)

  • 在每一行中,我都有一个if condition活动来检查该行是否为标题(被视为文件名)。因此,我相应地连接了每一行,并根据要求使用复制数据活动。

  • 下面是整个管道JSON(您可以直接使用它,除非您必须创建数据集)。
{
    "name": "pipeline1",
    "properties": {
        "activities": [
            {
                "name": "file as text",
                "type": "Lookup",
                "dependsOn": [],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    },
                    "dataset": {
                        "referenceName": "csv1",
                        "type": "DatasetReference"
                    },
                    "firstRowOnly": false
                }
            },
            {
                "name": "header",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "file as text",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "header",
                    "value": {
                        "value": "@first(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))))",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "each file",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "header",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "each file",
                    "value": {
                        "value": "@variables('header')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "get first filename",
                "type": "SetVariable",
                "dependsOn": [
                    {
                        "activity": "each file",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "variableName": "filename",
                    "value": {
                        "value": "@replace(first(skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),1)),',,,','')",
                        "type": "Expression"
                    }
                }
            },
            {
                "name": "ForEach1",
                "type": "ForEach",
                "dependsOn": [
                    {
                        "activity": "get first filename",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "userProperties": [],
                "typeProperties": {
                    "items": {
                        "value": "@skip(array(split(activity('file as text').output.value[0]['Prop_0'],decodeUriComponent('%0A'))),2)",
                        "type": "Expression"
                    },
                    "isSequential": true,
                    "activities": [
                        {
                            "name": "If Condition1",
                            "type": "IfCondition",
                            "dependsOn": [],
                            "userProperties": [],
                            "typeProperties": {
                                "expression": {
                                    "value": "@contains(item(),',,,')",
                                    "type": "Expression"
                                },
                                "ifFalseActivities": [
                                    {
                                        "name": "each row",
                                        "type": "SetVariable",
                                        "dependsOn": [],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "each row",
                                            "value": {
                                                "value": "@concat(variables('each file'),decodeUriComponent('%0A'),item())",
                                                "type": "Expression"
                                            }
                                        }
                                    },
                                    {
                                        "name": "complete data",
                                        "type": "SetVariable",
                                        "dependsOn": [
                                            {
                                                "activity": "each row",
                                                "dependencyConditions": [
                                                    "Succeeded"
                                                ]
                                            }
                                        ],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "each file",
                                            "value": {
                                                "value": "@variables('each row')",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ],
                                "ifTrueActivities": [
                                    {
                                        "name": "create each file",
                                        "type": "Copy",
                                        "dependsOn": [],
                                        "policy": {
                                            "timeout": "0.12:00:00",
                                            "retry": 0,
                                            "retryIntervalInSeconds": 30,
                                            "secureOutput": false,
                                            "secureInput": false
                                        },
                                        "userProperties": [],
                                        "typeProperties": {
                                            "source": {
                                                "type": "DelimitedTextSource",
                                                "additionalColumns": [
                                                    {
                                                        "name": "req",
                                                        "value": {
                                                            "value": "@variables('each file')",
                                                            "type": "Expression"
                                                        }
                                                    }
                                                ],
                                                "storeSettings": {
                                                    "type": "AzureBlobFSReadSettings",
                                                    "recursive": true,
                                                    "enablePartitionDiscovery": false
                                                },
                                                "formatSettings": {
                                                    "type": "DelimitedTextReadSettings"
                                                }
                                            },
                                            "sink": {
                                                "type": "DelimitedTextSink",
                                                "storeSettings": {
                                                    "type": "AzureBlobFSWriteSettings"
                                                },
                                                "formatSettings": {
                                                    "type": "DelimitedTextWriteSettings",
                                                    "quoteAllText": true,
                                                    "fileExtension": ".txt"
                                                }
                                            },
                                            "enableStaging": false,
                                            "translator": {
                                                "type": "TabularTranslator",
                                                "mappings": [
                                                    {
                                                        "source": {
                                                            "name": "req",
                                                            "type": "String"
                                                        },
                                                        "sink": {
                                                            "type": "String",
                                                            "physicalType": "String",
                                                            "ordinal": 1
                                                        }
                                                    }
                                                ],
                                                "typeConversion": true,
                                                "typeConversionSettings": {
                                                    "allowDataTruncation": true,
                                                    "treatBooleanAsNumber": false
                                                }
                                            }
                                        },
                                        "inputs": [
                                            {
                                                "referenceName": "demo",
                                                "type": "DatasetReference"
                                            }
                                        ],
                                        "outputs": [
                                            {
                                                "referenceName": "op_files",
                                                "type": "DatasetReference",
                                                "parameters": {
                                                    "fileName": {
                                                        "value": "@variables('filename')",
                                                        "type": "Expression"
                                                    }
                                                }
                                            }
                                        ]
                                    },
                                    {
                                        "name": "change filename",
                                        "type": "SetVariable",
                                        "dependsOn": [
                                            {
                                                "activity": "create each file",
                                                "dependencyConditions": [
                                                    "Succeeded"
                                                ]
                                            }
                                        ],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "filename",
                                            "value": {
                                                "value": "@replace(item(),',,,','')",
                                                "type": "Expression"
                                            }
                                        }
                                    },
                                    {
                                        "name": "re initialise each file value",
                                        "type": "SetVariable",
                                        "dependsOn": [
                                            {
                                                "activity": "change filename",
                                                "dependencyConditions": [
                                                    "Succeeded"
                                                ]
                                            }
                                        ],
                                        "userProperties": [],
                                        "typeProperties": {
                                            "variableName": "each file",
                                            "value": {
                                                "value": "@variables('header')",
                                                "type": "Expression"
                                            }
                                        }
                                    }
                                ]
                            }
                        }
                    ]
                }
            },
            {
                "name": "for last file within csv",
                "type": "Copy",
                "dependsOn": [
                    {
                        "activity": "ForEach1",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "0.12:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "additionalColumns": [
                            {
                                "name": "req",
                                "value": {
                                    "value": "@variables('each file')",
                                    "type": "Expression"
                                }
                            }
                        ],
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    },
                    "sink": {
                        "type": "DelimitedTextSink",
                        "storeSettings": {
                            "type": "AzureBlobFSWriteSettings"
                        },
                        "formatSettings": {
                            "type": "DelimitedTextWriteSettings",
                            "quoteAllText": true,
                            "fileExtension": ".txt"
                        }
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "mappings": [
                            {
                                "source": {
                                    "name": "req",
                                    "type": "String"
                                },
                                "sink": {
                                    "type": "String",
                                    "physicalType": "String",
                                    "ordinal": 1
                                }
                            }
                        ],
                        "typeConversion": true,
                        "typeConversionSettings": {
                            "allowDataTruncation": true,
                            "treatBooleanAsNumber": false
                        }
                    }
                },
                "inputs": [
                    {
                        "referenceName": "demo",
                        "type": "DatasetReference"
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "op_files",
                        "type": "DatasetReference",
                        "parameters": {
                            "fileName": {
                                "value": "@variables('filename')",
                                "type": "Expression"
                            }
                        }
                    }
                ]
            }
        ],
        "variables": {
            "header": {
                "type": "String"
            },
            "each file": {
                "type": "String"
            },
            "filename": {
                "type": "String"
            },
            "each row": {
                "type": "String"
            }
        },
        "annotations": []
    }
}
  • 对于复制数据,源数据如下所示:

  • 复制数据活动的接收器具有以下数据集配置(源数据集和接收器数据集在2个复制数据活动中相同):

  • 以下是给定样本数据的每个文件的输出:

  • 如果文件以数据(而不是标题)结尾,则将根据需要填充该文件,而不是只填充标题的空文件。
beq87vna

beq87vna2#

如果输入数据集是静态的,则将第二个选项视为要求,然后可以使用以下方法:
1.在源后面添加筛选器转换,表达式为:!startsWith(Col1, 'Header')
1.添加代理键转换以创建增量标识列
1.添加条件拆分转换,将数据拆分为具有以下表达式的三个部分:
x一个月一个月一次x:x一个月二个月一次x一个月三个月一次x:x一个四个一个x一个五个一个x一个六个一个
1.使用选择转换取消选择“Id”列
1.添加sink转换以将数据加载到csv文件

相关问题