Azure数据工厂数据流中动态模式的“存在条件”

q3qa4bjr  于 2023-04-22  发布在  其他
关注(0)|答案(1)|浏览(110)

我正在使用Azure数据工厂,其中使用了数据流。在此数据流中,我想比较两个源,使用“Exits”转换。两个源具有相同的列名。只有源1中不存在于源2中的数据流才应存储在接收器中。配置Exits条件时出现问题。因为我想对许多数据集使用相同的管道,所以我想使用自定义表达式字段并实现后期绑定来比较两个源中所需的列($primaryKeys),它保存了需要在两个源中进行比较的列。这里是我卡住的地方:

给出错误的表达式:

有什么建议吗?我怎么才能让这个表达式中的动态模式工作呢?

ss2ws0br

ss2ws0br1#

  • 我能够实现您的要求,使用2个数据流,而不是一个地方,我用一个数据流来建立自定义表达式和其他实现您的存在逻辑。
  • 我在数据流中使用了primitiveKeys数组参数,我从管道中传递静态值以进行演示。

  • 在第一个数据流中,使用上面的数组参数,将任何数据集作为源。创建一个派生列转换,动态值为toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))。source1和source2是我将应用exists转换的左右流名称。

  • 现在,我们需要这个值。所以,我使用了缓存接收器并将输出写入活动输出。
  • 在第二个数据流中,我创建了一个字符串参数custom_expr,我将上面生成的值传递给它。

  • 现在在我们使用自定义表达式的数据流中,我使用了动态内容作为toBoolean(expr(toString($custom_expr)))

这将给予所需的结果。以下是输出图像以供参考。

  • 源1数据预览:

  • 源2数据预览:

  • 不存在结果:

  • 第一个数据流的JSON:
{
    "name": "dataflow2",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText6",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                }
            ],
            "sinks": [
                {
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "derivedColumn1"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     cols as string[] (['a','b'])",
                "}",
                "source(output(",
                "          id as string,",
                "          first_name as string,",
                "          date as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source1 derive(tp = toString(reduce(map($cols,concat('source1@',#item,' == source2@',#item)),'true() ',#acc+' && '+#item,#result))) ~> derivedColumn1",
                "derivedColumn1 sink(validateSchema: false,",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     store: 'cache',",
                "     format: 'inline',",
                "     output: true,",
                "     saveOrder: 1) ~> sink1"
            ]
        }
    }
}
  • 第二个数据流的JSON:
{
    "name": "dataflow1",
    "properties": {
        "type": "MappingDataFlow",
        "typeProperties": {
            "sources": [
                {
                    "dataset": {
                        "referenceName": "DelimitedText5",
                        "type": "DatasetReference"
                    },
                    "name": "source1"
                },
                {
                    "dataset": {
                        "referenceName": "DelimitedText6",
                        "type": "DatasetReference"
                    },
                    "name": "source2"
                }
            ],
            "sinks": [
                {
                    "name": "sink1"
                }
            ],
            "transformations": [
                {
                    "name": "exists1"
                }
            ],
            "scriptLines": [
                "parameters{",
                "     custom_expr as string ('true()  && source1@id == source2@id && source1@first_name == source2@first_name')",
                "}",
                "source(output(",
                "          id as string,",
                "          first_name as string,",
                "          date as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source1",
                "source(output(",
                "          id as string,",
                "          first_name as string,",
                "          date as string",
                "     ),",
                "     allowSchemaDrift: true,",
                "     validateSchema: false,",
                "     ignoreNoFilesFound: false) ~> source2",
                "source1, source2 exists(toBoolean(expr(toString($custom_expr))),",
                "     negate:true,",
                "     broadcast: 'both')~> exists1",
                "exists1 sink(validateSchema: false,",
                "     skipDuplicateMapInputs: true,",
                "     skipDuplicateMapOutputs: true,",
                "     store: 'cache',",
                "     format: 'inline',",
                "     output: true,",
                "     saveOrder: 1) ~> sink1"
            ]
        }
    }
}

**注意:**当您尝试复制此内容时,上述内容中的唯一硬编码值将是用于构建表达式的左右流名称。

相关问题