我正在使用Azure数据工厂,其中使用了数据流。在此数据流中,我想比较两个源,使用“Exits”转换。两个源具有相同的列名。只有源1中不存在于源2中的数据流才应存储在接收器中。配置Exits条件时出现问题。因为我想对许多数据集使用相同的管道,所以我想使用自定义表达式字段并实现后期绑定来比较两个源中所需的列($primaryKeys),它保存了需要在两个源中进行比较的列。这里是我卡住的地方:
给出错误的表达式:
有什么建议吗?我怎么才能让这个表达式中的动态模式工作呢?
ss2ws0br1#
primitiveKeys
toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))
exists
custom_expr
toBoolean(expr(toString($custom_expr)))
这将给予所需的结果。以下是输出图像以供参考。
{ "name": "dataflow2", "properties": { "type": "MappingDataFlow", "typeProperties": { "sources": [ { "dataset": { "referenceName": "DelimitedText6", "type": "DatasetReference" }, "name": "source1" } ], "sinks": [ { "name": "sink1" } ], "transformations": [ { "name": "derivedColumn1" } ], "scriptLines": [ "parameters{", " cols as string[] (['a','b'])", "}", "source(output(", " id as string,", " first_name as string,", " date as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false) ~> source1", "source1 derive(tp = toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))) ~> derivedColumn1", "derivedColumn1 sink(validateSchema: false,", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true,", " store: 'cache',", " format: 'inline',", " output: true,", " saveOrder: 1) ~> sink1" ] } } }
{ "name": "dataflow1", "properties": { "type": "MappingDataFlow", "typeProperties": { "sources": [ { "dataset": { "referenceName": "DelimitedText5", "type": "DatasetReference" }, "name": "source1" }, { "dataset": { "referenceName": "DelimitedText6", "type": "DatasetReference" }, "name": "source2" } ], "sinks": [ { "name": "sink1" } ], "transformations": [ { "name": "exists1" } ], "scriptLines": [ "parameters{", " custom_expr as string ('true() && source1@id == source2@id && source1@first_name == source2@first_name')", "}", "source(output(", " id as string,", " first_name as string,", " date as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false) ~> source1", "source(output(", " id as string,", " first_name as string,", " date as string", " ),", " allowSchemaDrift: true,", " validateSchema: false,", " ignoreNoFilesFound: false) ~> source2", "source1, source2 exists(toBoolean(expr(toString($custom_expr))),", " negate:true,", " broadcast: 'both')~> exists1", "exists1 sink(validateSchema: false,", " skipDuplicateMapInputs: true,", " skipDuplicateMapOutputs: true,", " store: 'cache',", " format: 'inline',", " output: true,", " saveOrder: 1) ~> sink1" ] } } }
**注意:**当您尝试复制此内容时,上述内容中的唯一硬编码值将是用于构建表达式的左右流名称。
1条答案
按热度按时间ss2ws0br1#
primitiveKeys
数组参数,我从管道中传递静态值以进行演示。toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))
。source1和source2是我将应用exists
转换的左右流名称。custom_expr
,我将上面生成的值传递给它。toBoolean(expr(toString($custom_expr)))
。这将给予所需的结果。以下是输出图像以供参考。
**注意:**当您尝试复制此内容时,上述内容中的唯一硬编码值将是用于构建表达式的左右流名称。