在Azure Synapse Analitycs或Azure Data Factory中按ID将两个CSV与Upsert(插入或更新)进行比较

uz75evzq  于 2023-05-26  发布在  其他
关注(0)|答案(1)|浏览(107)

我需要比较两个CSV,当IDcsv1等于IDcsv2时,我需要更新数据。当IDcsv2在csv1中不存在时,我需要插入数据。如果可能,我想更新最近的一行,就像最后一次修改一样。先谢谢你了

o2gm4chl

o2gm4chl1#

AFAIK,仅ADF中的数据库数据集支持Currentlty更新选项。由于您的要求是关于csv文件的,因此可以通过两种方式完成。
使用数据流:
目标数据集-将在此数据集中插入或更新数据。

输入数据集-数据将被复制到目标表。

因此,我将这些数据集,Sample2.csv作为源,Sample1.csv作为数据流中的接收器。
我使用了3 exists转换来获取行。
1.对于存在于目标中但不存在于源中的行。
1.同时存在于目标和源中的行
1.对于在目标中不存在但仅在源中存在的行。
然后,我在这3个结果上使用union by Position2次以获得结果数据。现在,我已经把它交给了sink,并在sink中使用了目标数据集。
这是我的数据流:

我的数据流JSON供您参考:

{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "csv1sink",
                        "type": "DatasetReference"
                    },
                    "name": "mysink"
                },
                {
                    "dataset": {
                        "referenceName": "csv2source",
                        "type": "DatasetReference"
                    },
                    "name": "mysource"
                }
            ],
            "sinks": [
                {
                    "dataset": {
                        "referenceName": "csv1sink",
                        "type": "DatasetReference"
                    },
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "joinrows"
                },
                {
                    "name": "sinkrows"
                },
                {
                    "name": "sourcerows"
                },
                {
                    "name": "union1"
                },
                {
                    "name": "union2"
                }
            ],
            "scriptLines": [
                "source(output(",
                "          csv1Id as string,",
                "          name as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> mysink",
                "source(output(",
                "          csv2Id as string,",
                "          name as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> mysource",
                "mysource, mysink exists(csv2Id == csv1Id,",
                "     negate:false,",
                "     broadcast: 'auto')~> joinrows",
                "mysink, mysource exists(csv1Id == csv2Id,",
                "     negate:true,",
                "     broadcast: 'auto')~> sinkrows",
                "mysource, mysink exists(csv2Id == csv1Id,",
                "     negate:true,",
                "     broadcast: 'auto')~> sourcerows",
                "sinkrows, joinrows union(byName: false)~> union1",
                "union1, sourcerows union(byName: false)~> union2",
                "union2 sink(allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     input(",
                "          csv1Id as string,",
                "          name as string",
                "     ),",
                "     partitionFileNames:['Sample1.csv'],",
                "     umask: 0022,",
                "     preCommands: [],",
                "     postCommands: [],",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     partitionBy('hash', 1)) ~> sink1"
            ]
        }
    }
}

数据流执行后的结果:

如果可能,我想更新最近的一行,就像最后一次修改一样。
为了实现这一点,数据流操作可能会变得复杂,因为我们首先需要根据日期对目标数据(匹配的行)进行排序,然后只更新第一个匹配的日期。如果你想做上面的事情,最好遵循下面的方法。

**使用SQL中的存储过程和临时表:**为此,您需要SQL数据库和ADF的链接服务。在此方法中,首先在两个csv数据集上使用两个查找活动。将结果JSON数组传递给存储过程活动。

在SQL中创建了一个存储过程,并在其中写入要upsert的逻辑。您也可以根据最后修改日期编写更新的逻辑。将结果存储到临时表中并为其创建ADF数据集。

在存储过程活动之后,使用一个复制活动,将源作为临时表,并接收到目标csv。这将把记录从临时表复制到目标csv文件。

相关问题