我需要比较两个CSV,当IDcsv1等于IDcsv2时,我需要更新数据。当IDcsv2在csv1中不存在时,我需要插入数据。如果可能,我想更新最近的一行,就像最后一次修改一样。先谢谢你了
o2gm4chl1#
AFAIK,仅ADF中的数据库数据集支持Currentlty更新选项。由于您的要求是关于csv文件的,因此可以通过两种方式完成。使用数据流:目标数据集-将在此数据集中插入或更新数据。
输入数据集-数据将被复制到目标表。
因此,我将这些数据集,Sample2.csv作为源,Sample1.csv作为数据流中的接收器。我使用了3 exists转换来获取行。1.对于存在于目标中但不存在于源中的行。1.同时存在于目标和源中的行1.对于在目标中不存在但仅在源中存在的行。然后,我在这3个结果上使用union by Position2次以获得结果数据。现在,我已经把它交给了sink,并在sink中使用了目标数据集。这是我的数据流:
我的数据流JSON供您参考:
{ "name": "dataflow1", "properties": { "type": "MappingDataFlow", "typeProperties": { "sources": [ { "dataset": { "referenceName": "csv1sink", "type": "DatasetReference" }, "name": "mysink" }, { "dataset": { "referenceName": "csv2source", "type": "DatasetReference" }, "name": "mysource" } ], "sinks": [ { "dataset": { "referenceName": "csv1sink", "type": "DatasetReference" }, "name": "sink1" } ], "transformations": [ { "name": "joinrows" }, { "name": "sinkrows" }, { "name": "sourcerows" }, { "name": "union1" }, { "name": "union2" } ], "scriptLines": [ "source(output(", " csv1Id as string,", " name as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false) ~> mysink", "source(output(", " csv2Id as string,", " name as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false) ~> mysource", "mysource, mysink exists(csv2Id == csv1Id,", " negate:false,", " broadcast: 'auto')~> joinrows", "mysink, mysource exists(csv1Id == csv2Id,", " negate:true,", " broadcast: 'auto')~> sinkrows", "mysource, mysink exists(csv2Id == csv1Id,", " negate:true,", " broadcast: 'auto')~> sourcerows", "sinkrows, joinrows union(byName: false)~> union1", "union1, sourcerows union(byName: false)~> union2", "union2 sink(allowSchemaDrift: true,", " validateSchema: false,", " input(", " csv1Id as string,", " name as string", " ),", " partitionFileNames:['Sample1.csv'],", " umask: 0022,", " preCommands: [],", " postCommands: [],", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true,", " partitionBy('hash', 1)) ~> sink1" ] } } }
数据流执行后的结果:
如果可能,我想更新最近的一行,就像最后一次修改一样。为了实现这一点,数据流操作可能会变得复杂,因为我们首先需要根据日期对目标数据(匹配的行)进行排序,然后只更新第一个匹配的日期。如果你想做上面的事情,最好遵循下面的方法。
**使用SQL中的存储过程和临时表:**为此,您需要SQL数据库和ADF的链接服务。在此方法中,首先在两个csv数据集上使用两个查找活动。将结果JSON数组传递给存储过程活动。
在SQL中创建了一个存储过程,并在其中写入要upsert的逻辑。您也可以根据最后修改日期编写更新的逻辑。将结果存储到临时表中并为其创建ADF数据集。
在存储过程活动之后,使用一个复制活动,将源作为临时表,并接收到目标csv。这将把记录从临时表复制到目标csv文件。
1条答案
按热度按时间o2gm4chl1#
AFAIK,仅ADF中的数据库数据集支持Currentlty更新选项。由于您的要求是关于csv文件的,因此可以通过两种方式完成。
使用数据流:
目标数据集-将在此数据集中插入或更新数据。
输入数据集-数据将被复制到目标表。
因此,我将这些数据集,Sample2.csv作为源,Sample1.csv作为数据流中的接收器。
我使用了3 exists转换来获取行。
1.对于存在于目标中但不存在于源中的行。
1.同时存在于目标和源中的行
1.对于在目标中不存在但仅在源中存在的行。
然后,我在这3个结果上使用union by Position2次以获得结果数据。现在,我已经把它交给了sink,并在sink中使用了目标数据集。
这是我的数据流:
我的数据流JSON供您参考:
数据流执行后的结果:
如果可能,我想更新最近的一行,就像最后一次修改一样。
为了实现这一点,数据流操作可能会变得复杂,因为我们首先需要根据日期对目标数据(匹配的行)进行排序,然后只更新第一个匹配的日期。如果你想做上面的事情,最好遵循下面的方法。
**使用SQL中的存储过程和临时表:**为此,您需要SQL数据库和ADF的链接服务。在此方法中,首先在两个csv数据集上使用两个查找活动。将结果JSON数组传递给存储过程活动。
在SQL中创建了一个存储过程,并在其中写入要upsert的逻辑。您也可以根据最后修改日期编写更新的逻辑。将结果存储到临时表中并为其创建ADF数据集。
在存储过程活动之后,使用一个复制活动,将源作为临时表,并接收到目标csv。这将把记录从临时表复制到目标csv文件。