json ADF -数据流中的复杂嵌套数组

xwmevbvl  于 2023-08-08  发布在  其他
关注(0)|答案(1)|浏览(123)

我有点新的使用数据流在ADF,所以这就是为什么我要求你的帮助。情况是这样的
我想从数据流中的REST API转换一个JSON文件,用于创建表格/逻辑表结构。转换后,我想将此数据+结构发送到Azure SQL数据库。

以下是我所做的概述:

创建了一个管道,将数据从网站复制到我的数据湖。
x1c 0d1x的数据



这是输入源:

{
    "id":[
        {
            "value":40051
        }
    ],
    "uuid":[
        {
            "value":"0ca12ac9-d94b-44cf-a35b-8b7256006cf8"
        }
    ],
    "revision_id":[
        {
            "value":1452381
        }
    ],
    "langcode":[
        {
            "value":"nl"
        }
    ],
    "type":[
        {
            "target_id":"par_chart",
            "target_type":"paragraphs_type",
            "target_uuid":"2c3143a2-bd78-4b4d-afb6-19160de928f2"
        }
    ],
    "status":[
        {
            "value":true
        }
    ],
    "created":[
        {
            "value":"2019-10-17T12:08:05+00:00",
            "format":"Y-m-d\\TH:i:sP"
        }
    ],
    "parent_id":[
        {
            "value":"2561"
        }
    ],
    "parent_type":[
        {
            "value":"node"
        }
    ],
    "parent_field_name":[
        {
            "value":"field_paragraphs"
        }
    ],
    "behavior_settings":[
        {
            "value":[
                
            ]
        }
    ],
    "default_langcode":[
        {
            "value":true
        }
    ],
    "revision_translation_affected":[
        {
            "value":true
        }
    ],
    "content_translation_source":[
        {
            "value":"und"
        }
    ],
    "content_translation_outdated":[
        {
            "value":false
        }
    ],
    "content_translation_changed":[
        {
            "value":"2023-05-09T09:48:38+00:00",
            "format":"Y-m-d\\TH:i:sP"
        }
    ],
    "field_par_chart":[
        {
            "csv":"[[\"\",\"Percentage behandeling\",\"Percentage behandeling\",\"Percentage behandeling\"],[\"2010\",\"19.4\",null,\"\"],[\"2011\",\"16.6\",null,\"\"],[\"2012\",\"15.4\",null,\"\"],[\"2013\",\"13.5\",null,\"\"],[\"2014\",\"13\",null,\"\"],[\"2015\",\"13\",null,\"\"],[\"2016\",\"14.1\",null,\"\"],[\"2017\",\"17.7\",null,\"\"],[\"2018\",\"24\",null,\"\"],[\"2019\",\"\",\"27.7\",\"\"],[\"2020\",\"\",\"31.9\",\"\"],[\"2021*\",\"\",\"\",\"42.9\"],[\"2022*\",null,\"\",\"46.2\"]]",
            "csv_url":"",
            "config":"{\"chart\":{\"type\":\"line\",\"renderTo\":{\"hcEvents\":{\"mousedown\":[{\"order\":null}],\"touchstart\":[{\"order\":null}],\"mouseover\":[{\"order\":null}],\"mouseout\":[{\"order\":null}]},\"__EV_STORE_KEY@7\":{}}},\"xAxis\":[{\"type\":\"category\",\"index\":0,\"isX\":true}],\"yAxis\":[{\"title\":{\"text\":\"Percentage\",\"offset\":-81.859375},\"min\":0,\"tickInterval\":5,\"index\":0,\"events\":{}}],\"templateName\":\"lineBasic\",\"series\":[{\"type\":null,\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":19.4},{\"name\":\"2011\",\"y\":16.6},{\"name\":\"2012\",\"y\":15.4},{\"name\":\"2013\",\"y\":13.5},{\"name\":\"2014\",\"y\":13},{\"name\":\"2015\",\"y\":13},{\"name\":\"2016\",\"y\":14.1},{\"name\":\"2017\",\"y\":17.7},{\"name\":\"2018\",\"y\":24},{\"name\":\"2019\",\"y\":null},{\"name\":\"2020\",\"y\":null},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\",\"_colorIndex\":0,\"_symbolIndex\":0},{\"type\":\"line\",\"animation\":false,\"data\":[{\"name\":\"2010\",\"y\":null},{\"name\":\"2011\",\"y\":null},{\"name\":\"2012\",\"y\":null},{\"name\":\"2013\",\"y\":null},{\"name\":\"2014\",\"y\":null},{\"name\":\"2015\",\"y\":null},{\"name\":\"2016\",\"y\":null},{\"name\":\"2017\",\"y\":null},{\"name\":\"2018\",\"y\":null},{\"name\":\"2019\",\"y\":27.7},{\"name\":\"2020\",\"y\":31.9},{\"name\":\"2021*\",\"y\":null},{\"name\":\"2022*\",\"y\":null}],\"name\":\"Percentage behandeling\"},{\"type\":\"line\",\"animation\":false,\"data\":[{\"y\":null,\"name\":\"2010\"},{\"y\":null,\"name\":\"2011\"},{\"y\":null,\"name\":\"2012\"},{\"y\":null,\"name\":\"2013\"},{\"y\":null,\"name\":\"2014\"},{\"y\":null,\"name\":\"2015\"},{\"y\":null,\"name\":\"2016\"},{\"y\":null,\"name\":\"2017\"},{\"y\":null,\"name\":\"2018\"},{\"y\":null,\"name\":\"2019\"},{\"y\":null,\"name\":\"2020\"},{\"y\":42.9,\"name\":\"2021*\"},{\"y\":46.2,\"name\":\"2022*\"}],\"name\":\"Percentage behandeling\"}],\"title\":{\"text\":\"Trend in wachttijden voor behandeling in ziekenhuis langer dan de Treeknorm\"},\"legend\":{\"enabled\":false}}"
        }
    ],
    "field_par_extra_info":[
        {
            "value":"singlecard"
        }
    ],
    "field_par_hidden":[
        {
            "value":false
        }
    ],
    "field_par_text":[
        {
            "value":"<ul>\r\n\t<li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\r\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus&nbsp;2021 t/m december&nbsp;2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\r\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\r\n</ul>\r\n\r\n<p><strong>Bron&nbsp;</strong>&nbsp;<br />\r\nWachttijdenonderzoek, Mediquest<br />\r\nWachttijdenregistratie NZa<br />\r\n<strong>Verslagjaar&nbsp; t/m</strong><br />\r\n2022<br />\r\n<strong>Laatste update gegevens&nbsp;</strong><br />\r\n24 mei 2023<br />\r\n<strong>Updatefrequentie&nbsp;</strong><br />\r\nJaarlijks<br />\r\n<strong>Meer info</strong><br />\r\n<a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\r\n",
            "format":"volledige_html",
            "processed":"<ul><li><em>Door meerdere veranderingen van de bron en methode zijn de percentages van 2010-2018, 2019-2020 en 2021-2022 niet goed met elkaar te vergelijken.</em></li>\n\t<li><em><span><span><span>De wachttijden van 2021 zijn enkel gebaseerd op wachttijden gemeten van augustus 2021 t/m december 2021 vanwege een verandering in de methode. In de eerste helft van 2021 was 28,7% van de wachttijden voor behandeling langer dan de treeknorm.</span></span></span></em></li>\n\t<li><em>In 2020 brak de coronapandemie uit. Hiermee moet rekening worden gehouden bij het interpreteren van de trends.</em></li>\n</ul><p><strong>Bron </strong> <br />\nWachttijdenonderzoek, Mediquest<br />\nWachttijdenregistratie NZa<br /><strong>Verslagjaar  t/m</strong><br />\n2022<br /><strong>Laatste update gegevens </strong><br />\n24 mei 2023<br /><strong>Updatefrequentie </strong><br />\nJaarlijks<br /><strong>Meer info</strong><br /><a href=\"https://www.vzinfo.nl/prestatie-indicatoren/wachttijd-langer-dan-treeknorm-behandeling\" target=\"_blank\">Prestatie-indicatoren gezondheidszorg op VZinfo.nl</a></p>\n\n"
        }
    ],
    "field_par_text_bgcolor":[
        {
            "value":"bg-gray-lightest"
        }
    ],
    "field_par_text_position":[
        {
            "value":"below"
        }
    ],
    "field_par_title":[
        {
            "value":"Trend "
        }
    ],
    "field_par_title_class":[
        
    ],
    "field_par_title_enable":[
        {
            "value":false
        }
    ],
    "field_par_title_tag":[
        {
            "value":"h4"
        }
    ]
}

字符串
这是ADF Pipeline的接收器部分:
它保存在数据湖中作为.json文件。


**重要信息:我只需要转换Json元素的这一部分:**field_par_chart with“csv”元素



预期的数据结构必须如下所示(见下图),并保存为文本文件。
x1c4d 1x型
如何使用ADF管道和/或ADF数据流实现此功能,有何建议?
非常感谢您的时间和努力!

kq4fsx7k

kq4fsx7k1#

如果您的目标列是有限的,并且列名是已知的,那么您可以尝试下面的方法。
在您预期的输出中,3列具有相同的名称,ADF或SQL不支持这些。因此,我忽略了field_par_chart字段中的第一个数组(列名数组)。
由于您只需要field_par_chart字段中的数据,因此可以使用select转换从源中删除其余字段。


的数据
然后我进行了3个衍生列变换。

派生列1:

拆分字符串,生成array of arrays,动态表达式如下:

map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'"',''),']',''),','))

字符串


派生列2:
由于列名相同,它跳过第一个子数组(列名数组),并展开所有子数组,并使用此动态表达式将其转换为行,如下所示。

unfold(slice(arr,2))


派生列3:

它从行数组生成所需的列。在这里,我手动给出列名,并将值从string转换为double**。您可以手动给予任何想要的列名。



sink中,给予你的目标SQL表,只给出这4列的Map,并删除我们从之前的转换中得到的额外列。


结果



通过管道执行此数据流,您可以将此数据加载到目标SQL表中。

我的Dataflow JSON供您参考:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "Json1",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "AzureSqlTable1",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "derivedColumn2"
                },
                {
                    "name": "derivedColumn3"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          id as (value as integer)[],",
                "          uuid as (value as string)[],",
                "          revision_id as (value as integer)[],",
                "          langcode as (value as string)[],",
                "          type as (target_id as string, target_type as string, target_uuid as string)[],",
                "          status as (value as boolean)[],",
                "          created as (value as string, format as string)[],",
                "          parent_id as (value as string)[],",
                "          parent_type as (value as string)[],",
                "          parent_field_name as (value as string)[],",
                "          behavior_settings as (value as string[])[],",
                "          default_langcode as (value as boolean)[],",
                "          revision_translation_affected as (value as boolean)[],",
                "          content_translation_source as (value as string)[],",
                "          content_translation_outdated as (value as boolean)[],",
                "          content_translation_changed as (value as string, format as string)[],",
                "          field_par_chart as (csv as string, csv_url as string, config as string)[],",
                "          field_par_extra_info as (value as string)[],",
                "          field_par_hidden as (value as boolean)[],",
                "          field_par_text as (value as string, format as string, processed as string)[],",
                "          field_par_text_bgcolor as (value as string)[],",
                "          field_par_text_position as (value as string)[],",
                "          field_par_title as (value as string)[],",
                "          field_par_title_class as string[],",
                "          field_par_title_enable as (value as boolean)[],",
                "          field_par_title_tag as (value as string)[]",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'singleDocument') ~> source1",
                "select1 derive(arr = map(split(replace(field_par_chart_csv,'[',''),'],'),split(replace(replace(#item,'\"',''),']',''),','))) ~> derivedColumn1",
                "source1 select(mapColumn(",
                "          field_par_chart_csv = field_par_chart[1].csv",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "derivedColumn1 derive(new_arr = unfold(slice(arr,2))) ~> derivedColumn2",
                "derivedColumn2 derive(Category = new_arr[1],",
                "          {Percentage behandeling1} = toDouble(new_arr[2]),",
                "          {Percentage behandeling2} = toDouble(new_arr[3]),",
                "          {Percentage behandeling3} = toDouble(new_arr[4])) ~> derivedColumn3",
                "derivedColumn3 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     deletable:false,",
                "     insertable:true,",
                "     updateable:false,",
                "     upsertable:false,",
                "     recreate:true,",
                "     format: 'table',",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     errorHandlingOption: 'stopOnFirstError',",
                "     mapColumn(",
                "          Category,",
                "          {Percentage behandeling1},",
                "          {Percentage behandeling2},",
                "          {Percentage behandeling3}",
                "     )) ~> sink1"
            ]
        }
    }
}

相关问题