ADF中csv文件中解析的JSON合并文件复制数据活动

k75qkfdt  于 2023-09-27  发布在  其他
关注(0)|答案(1)|浏览(108)

我检索一些API响应,这些响应通过for每个循环保存在单独的文件中。然后我使用一个复制数据活动,在每个循环中创建的所有JSON文件,我合并到一个JSON文件中。现在我关心的是,我想保存所有的行数组项,作为一个表。到现在为止,我已经达到了提取第一行项目数组,为每个API生成的json,但我需要采取合并文件的所有项目数组。
合并后的API JSON文件如下所示:

[{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["EC2PF_Part3_TopAnlagen","Succeeded","2023-08-15T11:32:29.120704Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateECountAnlagenExtended","Succeeded","2023-08-15T11:35:31.284712Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateFlxresEinheiten","Succeeded","2023-08-15T11:37:17.255339Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateECountFlxresExtended","Succeeded","2023-08-15T11:39:18.948178Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateGINIUSvsECount","Succeeded","2023-08-15T11:44:02.991948Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateGINIUSvsSoptimEKM","Succeeded","2023-08-15T11:48:48.04538Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateSoptimEKMEEGAnlagen","Succeeded","2023-08-15T11:56:02.22626Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["AllDataflows","Succeeded","2023-08-15T11:56:05.46919Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["csvToParquet","Succeeded","2023-09-07T06:33:27.15381Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["csvToParquet","Succeeded","2023-09-07T06:39:14.272804Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["csvToParquet","Succeeded","2023-09-07T11:41:41.178384Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-07T13:02:09.790652Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-07T13:41:50.799101Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-07T14:02:05.363832Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-07T14:08:32.283883Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-07T14:14:22.496675Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-07T14:33:41.980251Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["LogAnaytics","Succeeded","2023-09-08T07:21:50.936371Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T08:25:32.740153Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T08:40:32.732672Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T09:21:48.903464Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T09:55:13.416172Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T10:05:28.500085Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T11:10:03.799294Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T11:48:04.072832Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T12:15:33.620322Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T12:38:59.615093Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-08T13:59:41.945903Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["GetECount_FLEXR_Tables","Succeeded","2023-08-17T06:33:48.202237Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_EEE_Tables","Succeeded","2023-08-17T06:36:13.860009Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_FlexRes2Umspannwerk","Succeeded","2023-08-17T06:36:36.949946Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["Get_ECAnlagen2Umspannwerk","Succeeded","2023-08-17T06:37:00.725488Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_Lieferant_Tables","Succeeded","2023-08-17T06:38:15.774104Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_ZPT_Tables2","Succeeded","2023-08-17T06:39:34.637317Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_ZPT_Tables","Succeeded","2023-08-17T06:50:25.189962Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_Regelzone","Succeeded","2023-08-17T06:54:04.633444Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_SYS_Tables","Succeeded","2023-08-17T07:03:15.363786Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECOUNT_NA_Tables","Succeeded","2023-08-17T07:07:53.032784Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["ingECQueryFlexres","Succeeded","2023-08-17T07:08:08.629406Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_Gruppen_Tables","Succeeded","2023-08-17T07:08:34.641916Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["IngestBIZAGI","Succeeded","2023-08-17T07:10:06.841188Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateECountAnlagenExtended","Succeeded","2023-08-17T07:17:26.303981Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateFlxresEinheiten","Succeeded","2023-08-17T07:19:27.731394Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateECountFlxresExtended","Succeeded","2023-08-17T07:21:41.963439Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateGINIUSvsECount","Succeeded","2023-08-17T07:27:04.955413Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateGINIUSvsSoptimEKM","Succeeded","2023-08-17T07:32:25.389198Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateSoptimEKMEEGAnlagen","Succeeded","2023-08-17T07:40:31.250797Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateFlexibleRessources","Succeeded","2023-08-17T07:42:31.64909Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateBIZAGIunionBestand","Succeeded","2023-08-17T07:43:05.667999Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["probECQueryFlexres","Succeeded","2023-08-17T07:43:24.075925Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateBIZAGIvsExcelAnlage","Succeeded","2023-08-17T07:46:11.773962Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["createBizagiAnlage","Succeeded","2023-08-17T07:46:28.69844Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["ingRawBestandPhotovoltaikExcel","Succeeded","2023-08-17T07:48:33.453314Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curBizagiBestandEKM","Succeeded","2023-08-17T07:48:53.782033Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateEinspeiseanlagenExcel","Succeeded","2023-08-17T07:49:28.50735Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["DailyJob","Succeeded","2023-08-17T07:49:30.331512Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["LogAnalyticsFlow","Succeeded","2023-09-06T11:54:48.911287Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-06T11:54:57.034405Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnalyticsFlow","Succeeded","2023-09-06T13:46:10.397883Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["pipeline2","Succeeded","2023-09-05T07:30:48.655564Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["pipeline2","Succeeded","2023-09-05T08:09:49.226269Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["pipeline2","Succeeded","2023-09-05T08:45:34.140599Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-05T09:43:07.986238Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-05T09:54:12.673131Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-05T10:17:50.844621Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-05T11:59:30.793858Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[]}]}
,{"tables":[{"name":"PrimaryResult","columns":[{"name":"PipelineName","type":"string"},{"name":"Status","type":"string"},{"name":"TimeGenerated","type":"datetime"},{"name":"_ResourceId","type":"string"},{"name":"Type","type":"string"}],"rows":[["GetECount_FLEXR_Tables","Succeeded","2023-09-04T08:00:20.658823Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_EEE_Tables","Succeeded","2023-09-04T08:02:45.282715Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_FlexRes2Umspannwerk","Succeeded","2023-09-04T08:03:09.521574Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["Get_ECAnlagen2Umspannwerk","Succeeded","2023-09-04T08:03:33.191387Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_Lieferant_Tables","Succeeded","2023-09-04T08:04:47.494212Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_ZPT_Tables2","Succeeded","2023-09-04T08:06:08.732235Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_ZPT_Tables","Succeeded","2023-09-04T08:15:48.77315Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-04T08:15:55.073672Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_Regelzone","Succeeded","2023-09-04T08:19:27.165214Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_SYS_Tables","Succeeded","2023-09-04T08:28:29.922937Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECOUNT_NA_Tables","Succeeded","2023-09-04T08:33:08.995352Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["ingECQueryFlexres","Succeeded","2023-09-04T08:33:24.751274Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["GetECount_Gruppen_Tables","Succeeded","2023-09-04T08:33:50.765466Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["IngestBIZAGI","Succeeded","2023-09-04T08:35:05.045513Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["LogAnaytics","Succeeded","2023-09-04T08:37:03.591298Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateECountAnlagenExtended","Succeeded","2023-09-04T08:41:56.090302Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateFlxresEinheiten","Succeeded","2023-09-04T08:43:49.781644Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CreateECountFlxresExtended","Succeeded","2023-09-04T08:45:58.749538Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateGINIUSvsECount","Succeeded","2023-09-04T08:51:15.870807Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateGINIUSvsSoptimEKM","Succeeded","2023-09-04T08:56:27.391099Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateSoptimEKMEEGAnlagen","Succeeded","2023-09-04T09:04:18.703808Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateFlexibleRessources","Succeeded","2023-09-04T09:06:14.962942Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateBIZAGIunionBestand","Succeeded","2023-09-04T09:06:47.728712Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["probECQueryFlexres","Succeeded","2023-09-04T09:07:00.283812Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curateBIZAGIvsExcelAnlage","Succeeded","2023-09-04T09:09:40.825211Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["createBizagiAnlage","Succeeded","2023-09-04T09:09:54.571941Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["ingRawBestandPhotovoltaikExcel","Succeeded","2023-09-04T09:13:44.550155Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["curBizagiBestandEKM","Succeeded","2023-09-04T09:14:03.760771Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["CurateEinspeiseanlagenExcel","Succeeded","2023-09-04T09:14:35.509703Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"],["DailyJob","Succeeded","2023-09-04T09:14:37.30494Z","/providers/microsoft.datafactory/factories/dfactory","ADFPipelineRun"]]}]}
]

使用复制数据活动时创建的csv文件为:

respwectivly, for each column :
$['tables'][]['rows'][0][0]
$['tables'][]['rows'][0][1]
$['tables'][]['rows'][0][2]
$['tables'][]['rows'][0][3]
$['tables'][*]['rows'][0][4]

我得到的CSV的结果是每个json文件的一个行数组项,我想得到所有行数组项:

z6psavjg

z6psavjg1#

复制活动不会给予所需的CSV文件,因为它不会处理嵌套的数组值。
您可以使用ADF Dataflow来实现您的要求,而不是复制活动。
在ForEach之后,使用数据流活动调用数据流并创建新的数据流。在JavaScript中,给予你的源代码JSON文件。
与复制活动通配符路径类似,在dataflow中也可以为JSON文件给予通配符路径

在源选项中【JSON设置】,选择【文档数组】。
我从**@Pratik Lad**的SO answer中遵循了相同的方法。但是我已经修改了它,以便将其用于多个JSON源文件。

以下是我对上述答案的一些修改:
代理密钥转换后数据预览:

您可以看到,由于我们正在处理多个文件,因此在展平columns数组后,我们得到了重复的行。因此,我添加了一个过滤器活动来获取所需的值。
ColKey<=5

另外,为了将行作为列,我将column_names列指定为Pivot key

这将给予这样的结果。

但是我们只需要Union转换的头,所以我在此之后使用了另一个过滤器转换,并在其中给出了false()。这将只给予标题。

现在,在此之后使用工会。在sink中给予csv文件,您将获得如下所示的预期结果。

我的Dataflow JSON供您参考:

{
    "name": "dataflow3",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "source_json_files",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "csv_target",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "flattenForRows"
                },
                {
                    "name": "flattenforcolumns"
                },
                {
                    "name": "pivot1"
                },
                {
                    "name": "surrogateKey1"
                },
                {
                    "name": "filter1"
                },
                {
                    "name": "derivedColumn1"
                },
                {
                    "name": "select1"
                },
                {
                    "name": "union1"
                },
                {
                    "name": "filter2"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          tables as (columns as (name as string, type as string)[], name as string, rows as string[][])[]",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false,",
                "     documentForm: 'arrayOfDocuments',",
                "     wildcardPaths:['*.json']) ~> source1",
                "source1 foldDown(unroll(tables.rows, tables.rows),",
                "     mapColumn(",
                "          rows = tables.rows",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> flattenForRows",
                "source1 foldDown(unroll(tables.columns, tables.columns),",
                "     mapColumn(",
                "          column_names = tables.columns.name",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> flattenforcolumns",
                "filter1 pivot(pivotBy(column_names),",
                "     {} = max(column_names),",
                "     columnNaming: '$N$V',",
                "     lateral: false) ~> pivot1",
                "flattenforcolumns keyGenerate(output(ColKey as long),",
                "     startAt: 1L,",
                "     stepValue: 1L) ~> surrogateKey1",
                "surrogateKey1 filter(ColKey<=5) ~> filter1",
                "flattenForRows derive(rc1 = rows[1],",
                "          rc2 = rows[2],",
                "          rc3 = rows[3],",
                "          rc4 = rows[4],",
                "          rc5 = rows[5]) ~> derivedColumn1",
                "derivedColumn1 select(mapColumn(",
                "          rc1,",
                "          rc2,",
                "          rc3,",
                "          rc4,",
                "          rc5",
                "     ),",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true) ~> select1",
                "filter2, select1 union(byName: false)~> union1",
                "pivot1 filter(false()) ~> filter2",
                "union1 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     partitionFileNames:['target.csv'],",
                "     umask: 0022,",
                "     preCommands: [],",
                "     postCommands: [],",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     partitionBy('hash', 1)) ~> sink1"
            ]
        }
    }
}

相关问题