pyspark 应用数据集中定义比较运算

dfuffjeb  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(86)

我有一个表,其中有几个字段,我需要对这些字段进行数据质量检查。
数据质量检查被定义为第二表中的规则。

数据表:

| 识别码|名称1|名称2|拉链1|拉链2|
| - -|- -|- -|- -|- -|
| 第001章|若翰|若翰|一百二十三|一百二十三|
| 002年|萨拉|萨拉|二百三十四人|二百三十四人|
| 003年|比尔|威廉|九九九|一一一|
| 004年|丽莎|丽莎|八百八十八|三百三十三个|
| 005年|马田|马田|三百四十五|三百四十五|
| 006年|玛格丽特|玛格丽特|四百五十六人|四百五十六人|
| 007年|奥斯卡|奥斯卡|六百七十八|六百七十八|
| 小008|彼得|彼得|七百八十九|七百八十九|

规则表:

| 识别码|字段向左|字段右对齐|比较操作|
| - -|- -|- -|- -|
| 第001章|名称1|名称2|相等|
| 二|拉链1|拉链2|相等|
所以规则基本上是说:名称1 =名称2且邮政编码1 =邮政编码2
预期的输出是不符合规则的记录。它应该为每个违反规则的记录生成一行(请参见记录003,名称和邮编都不一致-〉,因此记录003的结果中有两行)。

输出:

| 规则|识别码|字段向左|字段右对齐|
| - -|- -|- -|- -|
| 第001章|002年|萨拉|萨拉|
| 第001章|003年|比尔|威廉|
| 二|003年|九九九|一一一|
| 二|004年|八百八十八|三百三十三个|

k75qkfdt

k75qkfdt1#

下面是我实现

from pyspark.sql import functions as F
from pyspark.sql.types import *

df = spark.createDataFrame(
    [
        ("001", "John", "John", "123", "123"),
        ("002", "Sara", "Sarah", "234", "234"),
        ("003", "Bill", "William", "999", "111"),
        ("004", "Lisa", "Lisa", "888", "333"),
        ("005", "Martin", "Martin", "345", "345"),
        ("006", "Margaret", "Margaret", "456", "456"),
        ("007", "Oscar", "Oscar", "678", "678"),
        ("008", "Peter", "Peter", "789", "789"),
    ],
    ["ID", "Name1", "Name2", "Zip1", "Zip2"],
)

# df.show()

rule_df = spark.createDataFrame(
    [
        ("R001", "Name1", "Name2", "EQUALS"),
        ("R002", "Zip1", "Zip2", "EQUALS"),
    ],
    ["ID", "FieldLeft", "FieldRight", "ComparisonOperation"],
)

# rule_df.show()

final_rule_df = (rule_df
    .withColumn(
        "operator",
        F.when(
            F.lower(F.col("ComparisonOperation")) == "equals",
            F.lit(" == "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "not equals",
            F.lit(" != "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "greater than",
            F.lit(" > "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "less than",
            F.lit(" < "),
        )
        .otherwise(F.lit("operator_na")),
    )
    .filter(F.col("operator") != "operator_na" )
    .withColumn("expression", concat(F.col("FieldLeft"),F.col("operator"), F.col("FieldRight"))  )
    .drop("operator")
    #.withColumn(
    #    "select_clause", 
    #    F.concat(
    #        F.lit('"'),
    #        F.lit( F.col("FieldLeft") ),
    #        F.lit(" as " + F.col("FieldLeft")._jc.toString()),
    #        F.lit('"'),
    #        F.lit(", "),
    #        F.lit('"'),
    #        F.col("FieldRight"),
    #        F.lit(" as " + F.col("FieldRight")._jc.toString()),
    #        F.lit('"'),
    #    )
    #)                      
)
final_rule_df.show(truncate=False)

schema = StructType(
    [
        StructField("Rule", StringType(), True),
        StructField("ID", StringType(), True),
        StructField("FieldLeft", StringType(), True),
        StructField("FieldRight", StringType(), True),
    ]
)

final_non_compliant_df = spark.createDataFrame(
    spark.sparkContext.emptyRDD(), schema
)

rule_df_rows = final_rule_df.select("*").collect()
for row in rule_df_rows:
    rule_id = row.ID
    print(f"rule_id: {rule_id}")

    expression = row.expression
    print(f"expression: {expression}")

    #select_clause = row.select_clause
    #print(f"select_clause: {select_clause}")

    rule_df = df.filter(expr(expression))
    #rule_df.show()

    non_compliant_df = (df.subtract(rule_df)
        .withColumn("Rule", F.lit(rule_id))
        .withColumn("FieldLeft", F.col(row.FieldLeft))
        .withColumn("FieldRight", F.col(row.FieldRight))
        .selectExpr("Rule", "ID", "FieldLeft", "FieldRight")
    )
    non_compliant_df.show()
    final_non_compliant_df = final_non_compliant_df.union(non_compliant_df)

final_non_compliant_df.show()

输出:

+----+---------+----------+-------------------+--------------+
|ID  |FieldLeft|FieldRight|ComparisonOperation|expression    |
+----+---------+----------+-------------------+--------------+
|R001|Name1    |Name2     |EQUALS             |Name1 == Name2|
|R002|Zip1     |Zip2      |EQUALS             |Zip1 == Zip2  |
+----+---------+----------+-------------------+--------------+

rule_id: R001
expression: Name1 == Name2
+----+---+---------+----------+
|Rule| ID|FieldLeft|FieldRight|
+----+---+---------+----------+
|R001|003|     Bill|   William|
|R001|002|     Sara|     Sarah|
+----+---+---------+----------+

rule_id: R002
expression: Zip1 == Zip2
+----+---+---------+----------+
|Rule| ID|FieldLeft|FieldRight|
+----+---+---------+----------+
|R002|004|      888|       333|
|R002|003|      999|       111|
+----+---+---------+----------+

最终输出:

+----+---+---------+----------+
|Rule| ID|FieldLeft|FieldRight|
+----+---+---------+----------+
|R001|003|     Bill|   William|
|R001|002|     Sara|     Sarah|
|R002|004|      888|       333|
|R002|003|      999|       111|
+----+---+---------+----------+
3qpi33ja

3qpi33ja2#

@hbit我不确定没有显式循环的完整解决方案。我已经使用交叉连接为每个记录添加一个规则,创建一个笛卡尔结果集。您可以看到eval为false的记录,您应该能够根据规则列提取所有正确的false记录。

from pyspark.sql import functions as F
from pyspark.sql.types import *

df = spark.createDataFrame(
    [
        ("001", "John", "John", "123", "123"),
        ("002", "Sara", "Sarah", "234", "234"),
        ("003", "Bill", "William", "999", "111"),
        ("004", "Lisa", "Lisa", "888", "333"),
        ("005", "Martin", "Martin", "345", "345"),
        ("006", "Margaret", "Margaret", "456", "456"),
        ("007", "Oscar", "Oscar", "678", "678"),
        ("008", "Peter", "Peter", "789", "789"),
    ],
    ["ID", "Name1", "Name2", "Zip1", "Zip2"],
)

rule_df = spark.createDataFrame(
    [
        ("R001", "Name1", "Name2", "EQUALS"),
        ("R002", "Zip1", "Zip2", "EQUALS"),
    ],
    ["ID", "FieldLeft", "FieldRight", "ComparisonOperation"],
)

# rule_df.show()

main_df_name = "df."
df_cols = [f"{main_df_name}{x}" for x in df.columns]  

# print(df_cols)

f_rule_df = (rule_df
    .withColumn(
        "operator",
        F.when(
            F.lower(F.col("ComparisonOperation")) == "equals",
            F.lit(" == "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "not equals",
            F.lit(" != "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "greater than",
            F.lit(" > "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "less than",
            F.lit(" < "),
        )
        .otherwise(F.lit("operator_na")),
    )
    .filter(F.col("operator") != "operator_na" )
    .withColumn("FieldLeft", concat(F.lit(main_df_name),F.col("FieldLeft") ) )
    .withColumn("FieldRight", concat(F.lit(main_df_name),F.col("FieldRight") ) )
    .withColumn("expression", concat(F.lit("("), F.col("FieldLeft"),F.col("operator"), F.col("FieldRight"), F.lit(").alias('" ), F.col("ID"),F.lit("')")   )  )
    .drop("operator")        
)
f_rule_df.show(truncate=False)
f_rule_df_name = "final_rule_df."
f_rule_df_cols = [f"{f_rule_df_name}{x}" for x in f_rule_df.columns]  

# print(f_rule_df_cols)

expressions = final_rule_df.select("expression").distinct().collect()

exp_array = [str(x.expression) for x in expressions]

# print(exp_array)

exp_array_cols = df_cols + f_rule_df_cols + exp_array

final_df = (
    df.crossJoin(final_rule_df)
    .select([eval(x) for x in exp_array_cols])
    .drop("ComparisonOperation")
    .drop("expression")
)

final_df.show()

+----+---------+----------+-------------------+------------------------------------+
|ID  |FieldLeft|FieldRight|ComparisonOperation|expression                          |
+----+---------+----------+-------------------+------------------------------------+
|R001|df.Name1 |df.Name2  |EQUALS             |(df.Name1 == df.Name2).alias('R001')|
|R002|df.Zip1  |df.Zip2   |EQUALS             |(df.Zip1 == df.Zip2).alias('R002')  |
+----+---------+----------+-------------------+------------------------------------+

+---+--------+--------+----+----+----+---------+----------+-----+-----+
| ID|   Name1|   Name2|Zip1|Zip2|  ID|FieldLeft|FieldRight| R001| R002|
+---+--------+--------+----+----+----+---------+----------+-----+-----+
|001|    John|    John| 123| 123|R001| df.Name1|  df.Name2| true| true|
|001|    John|    John| 123| 123|R002|  df.Zip1|   df.Zip2| true| true|
|002|    Sara|   Sarah| 234| 234|R001| df.Name1|  df.Name2|false| true|
|002|    Sara|   Sarah| 234| 234|R002|  df.Zip1|   df.Zip2|false| true|
|003|    Bill| William| 999| 111|R001| df.Name1|  df.Name2|false|false|
|003|    Bill| William| 999| 111|R002|  df.Zip1|   df.Zip2|false|false|
|004|    Lisa|    Lisa| 888| 333|R001| df.Name1|  df.Name2| true|false|
|004|    Lisa|    Lisa| 888| 333|R002|  df.Zip1|   df.Zip2| true|false|
|005|  Martin|  Martin| 345| 345|R001| df.Name1|  df.Name2| true| true|
|005|  Martin|  Martin| 345| 345|R002|  df.Zip1|   df.Zip2| true| true|
|006|Margaret|Margaret| 456| 456|R001| df.Name1|  df.Name2| true| true|
|006|Margaret|Margaret| 456| 456|R002|  df.Zip1|   df.Zip2| true| true|
|007|   Oscar|   Oscar| 678| 678|R001| df.Name1|  df.Name2| true| true|
|007|   Oscar|   Oscar| 678| 678|R002|  df.Zip1|   df.Zip2| true| true|
|008|   Peter|   Peter| 789| 789|R001| df.Name1|  df.Name2| true| true|
|008|   Peter|   Peter| 789| 789|R002|  df.Zip1|   df.Zip2| true| true|
+---+--------+--------+----+----+----+---------+----------+-----+-----+
rxztt3cl

rxztt3cl3#

这是我的第三次实现尝试。这一次使用动态sql,并且只循环遍历所有规则以创建适当的sql case字符串,然后将其一次性应用于主数据集。唯一需要注意的是如何显示使用此方法失败两次的相同记录。我确信有一种方法

from pyspark.sql import functions as F
from pyspark.sql.types import *

df = spark.createDataFrame(
    [
        ("001", "John", "John", "123", "123"),
        ("002", "Sara", "Sarah", "234", "234"),
        ("003", "Bill", "William", "999", "111"),
        ("004", "Lisa", "Lisa", "888", "333"),
        ("005", "Martin", "Martin", "345", "345"),
        ("006", "Margaret", "Margaret", "456", "456"),
        ("007", "Oscar", "Oscar", "678", "678"),
        ("008", "Peter", "Peter", "789", "789"),
    ],
    ["ID", "Name1", "Name2", "Zip1", "Zip2"],
)
df.createOrReplaceTempView("v_df")

rule_df = spark.createDataFrame(
    [
        ("R001", "Name1", "Name2", "EQUALS"),
        ("R002", "Zip1", "Zip2", "EQUALS"),
    ],
    ["rule_id", "FieldLeft", "FieldRight", "ComparisonOperation"],
)
rule_df.show()

f_rule_df = (rule_df
    .withColumn(
        "operator",
        F.when(
            F.lower(F.col("ComparisonOperation")) == "equals",
            F.lit(" == "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "not equals",
            F.lit(" != "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "greater than",
            F.lit(" > "),
        )
        .when(
            F.lower(F.col("ComparisonOperation")) == "less than",
            F.lit(" < "),
        )
        .otherwise(F.lit("operator_na")),
    )
    .filter(F.col("operator") != "operator_na" )
    .withColumn("expression", concat(F.lit("("), F.col("FieldLeft"),F.col("operator"), F.col("FieldRight"), F.lit(")")   )  )
    .drop("operator")        
)
f_rule_df.show()

rule_df_rows = f_rule_df.select("*").collect()

rule_array = []
rule_array.append("case \n")

pass_array = []
pass_array.append("case \n")

field_left_array = []
field_left_array.append("case \n")

field_right_array = []
field_right_array.append("case \n")

for row in rule_df_rows:
    rule_id = row.rule_id
    print(f"rule_id: {rule_id}")

    expression = row.expression
    print(f"expression: {expression}")

    field_left = row.FieldLeft
    print(f"field_left: {field_left}")

    field_right = row.FieldRight
    print(f"field_right: {field_right}")

    rule_str = f"\t when {expression} = false then '{rule_id}' \n"
    rule_array.append(rule_str)

    pass_str = f"\t when {expression} = false then false \n"
    pass_array.append(pass_str)

    field_left_str = f"\t when {expression} = false then {field_left} \n"
    field_left_array.append(field_left_str)

    field_right_str = f"\t when {expression} = false then {field_right} \n"
    field_right_array.append(field_right_str)

rule_array.append("""\t else 'NA' \nend as rule_id""")
f_rule_str = "".join(rule_array)

# print(f_rule_str)

pass_array.append("""\t else true \nend as pass_flag""")
f_pass_str = "".join(pass_array)

# print(f_pass_str)

field_left_array.append("""\t else 'NA' \nend as FieldLeft""")
f_field_left_str = "".join(field_left_array)

# print(f_field_left_str)

field_right_array.append("""\t else 'NA' \nend as FieldRight""")
f_field_right_str = "".join(field_right_array)

# print(f_field_right_str)

f_case_str = ",\n".join([f_rule_str,f_pass_str,f_field_left_str,f_field_right_str ])
print(f_case_str)

f_df = spark.sql(
f"""
    select 
            *,
            {f_case_str}
    from v_df      
"""
)
f_df = (f_df
    .filter(F.col("pass_flag") == False)
    .select("rule_id", "ID", "FieldLeft", "FieldRight")
)
f_df.show()

+-------+---+---------+----------+
|rule_id| ID|FieldLeft|FieldRight|
+-------+---+---------+----------+
|   R001|002|     Sara|     Sarah|
|   R001|003|     Bill|   William|
|   R002|004|      888|       333|
+-------+---+---------+----------+

相关问题