pyspark 将行中的值附加到数据块中的现有值

xnifntxz  于 2023-03-11  发布在  Spark
关注(0)|答案(2)|浏览(159)

我是新接触数据库的,所以如果我听起来很愚蠢,请原谅我。我有一个需求,我正在对数据框进行验证,我目前已经为每个验证定义了函数,比如一个用于空值检查的函数,一个用于Date_range的函数,每次我的函数满足验证规则时,它都应该在Validation_remark列中标记1,如下所示

Name    ID  Date_of Birth   position    Validation remarks
dam     1   02-04-1992  Manager     
dana        02-04-1992  Associate   1
rich    3   02-04-1992  VP  
danial  4   02-04-1992  CEO 
mathew      02-04-1910  Manager     1

但这里的问题是我不知道函数为什么把它标记为1,是因为ID列为空还是因为Date_of_birth os〉100年,或者两者都有。
所以我想知道我是否可以附加如下的原因。

Name    ID  Date_of Birth   position    Validation remarks
dam     1   02-04-1992  Manager     
dana        02-04-1992  Associate   ID id null
rich    3   02-04-1992  VP  
danial  4   02-04-1992  CEO 
mathew      02-04-1910  Manager     ['ID is null', 'Date_of_Birth is > 100 years']

即,如果行的ID为空,则注意,如果其出生日期〉100年,则将该值添加为late,如上所述。
我只想知道如何将值附加到验证备注

v8wbuo2f

v8wbuo2f1#

您可以在PySpark中使用一个定制的udf函数来完成这项工作。

from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
# Create example DataFrame
df = spark.createDataFrame([("dam", "1", "02-04-1992", "Manager"),
                            ("dana", "", "02-04-1992", "Associate"),
                            ("rich", "3", "02-04-1992", "VP"),
                            ("danial", "4", "02-04-1992", "CEO"),
                            ("mathew", "", "02-04-1910", "Manager")],
                           ["Name", "ID", "Date_of_birth", "position"])
df.show()
# +------+---+-------------+---------+
# |  Name| ID|Date_of_birth| position|
# +------+---+-------------+---------+
# |   dam|  1|   02-04-1992|  Manager|
# |  dana|   |   02-04-1992|Associate|
# |  rich|  3|   02-04-1992|       VP|
# |danial|  4|   02-04-1992|      CEO|
# |mathew|   |   02-04-1910|  Manager|
# +------+---+-------------+---------+
# Python func to check ID and date of birth, return list of remarks
def validate_row(row):
    remarks = []
    if row.ID == "":
        remarks.append("ID is null")
    if (row.Date_of_birth is not None and
        int(row.Date_of_birth.split("-")[-1]) <= 2023 - 100):
        remarks.append("Date_of_Birth is > 100 years") # I will check with current year 2023
    return remarks

# Create an UDF func from Python func above
validate_udf = udf(validate_row, ArrayType(StringType()))

# Apply the func above to each row in DF
df = df.withColumn("Validation remarks", validate_udf(struct(df.columns)))
df.show()
# +------+---+-------------+---------+--------------------+
# |  Name| ID|Date_of_birth| position|  Validation remarks|
# +------+---+-------------+---------+--------------------+
# |   dam|  1|   02-04-1992|  Manager|                  []|
# |  dana|   |   02-04-1992|Associate|        [ID is null]|
# |  rich|  3|   02-04-1992|       VP|                  []|
# |danial|  4|   02-04-1992|      CEO|                  []|
# |mathew|   |   02-04-1910|  Manager|[ID is null, Date_of_Birth is > 100 years|
# +------+---+-------------+---------+--------------------+
pieyvz9o

pieyvz9o2#

以下是我的两分钱:
1.创建 Dataframe 如下:

+------+----+-------------+---------+
 |  Name|  ID|Date_of_Birth| position|
 +------+----+-------------+---------+
 |   dam|   1|   1992-04-02|  Manager|
 |  dana|null|   1992-04-02|Associate|
 |  rich|   3|   1992-04-02|       VP|
 |danial|   4|   1992-04-02|      CEO|
 |mathew|null|   1910-04-02|  Manager|
 +------+----+-------------+---------+

1.将所有条件放入列表(cond_result_list):

df_input = df_input.withColumn('Validation remarks',lit(('')))

 cond_result_list = [
     ["id is null",'Id is Null'],
     ["DATEDIFF(CURRENT_DATE(), Date_of_Birth) / 365.25 > 100",'Date_of_Birth is > 100 years']
 ]

 for i in range(len(cond_result_list)):
     df_input = df_input.withColumn('Validation remarks',
                     concat(col('Validation remarks'), 
                             expr("""case when {} then '{}{}' else '' end """.format(cond_result_list[i][0],'|',cond_result_list[i][1]))
                             )
                         )

 df_input= df_input.withColumn("Validation remarks", split(df_input["Validation remarks"], "\|"))
 df_input = df_input.withColumn("Validation remarks", array_remove(df_input["Validation remarks"], ""))

1.打印 Dataframe :

df_input.show(truncate=False)
 +------+----+-------------+---------+------------------------------------------+
 |Name  |ID  |Date_of_Birth|position |Validation remarks                        |
 +------+----+-------------+---------+------------------------------------------+
 |dam   |1   |1992-04-02   |Manager  |[]                                        |
 |dana  |null|1992-04-02   |Associate|[Id is Null]                              |
 |rich  |3   |1992-04-02   |VP       |[]                                        |
 |danial|4   |1992-04-02   |CEO      |[]                                        |
 |mathew|null|1910-04-02   |Manager  |[Id is Null, Date_of_Birth is > 100 years]|
 +------+----+-------------+---------+------------------------------------------+

相关问题