pyspark 使用最终 Dataframe 的常数值标记文件列中的错误记录

but5z9lq  于 2023-01-25  发布在  Spark
关注(0)|答案(2)|浏览(182)

我有一个用例,其中如果特定文件的err列中至少有一条记录的值为err_present,我希望将同一文件的其余记录标记为 Dataframe 中的bad_file值。
输入 Dataframe

+-----------+---------+
|err        |file_name|
+-----------+---------+
|err_present|f1       |
|           |f1       |
|           |f1       |
|           |f2       |
|           |f2       |
+-----------+---------+

上面的 Dataframe 中f1 file_name列为err_present。因此,我希望在最终 Dataframe 中将包含f1的其他行标记为bad_file

Desired output DF:
+--------+---------+
|err_present|file_name|
+--------+---------+--
|err_present|       f1|
|bad_file   |       f1|
|bad_file   |       f1|
|    null   |       f2|
|    null   |       f2|
+--------+---------+

示例 Dataframe

df = spark.createDataFrame([('err_present', 'f1'), ('', 'f1'), ('', 'f1'),
                        ('', 'f2'), ('', 'f2')]
                       , ['err', 'file_name'])
uemypmqf

uemypmqf1#

#Isolate rows with err_present, rename err column and lit bad_file
s = df.where(col('err')=='err_present').withColumn('newerr',lit('bad_file')).drop('err')

(df.join(broadcast(s),how='left', on='file_name')#broadcast join new df from above to avaoid shuffle
 .withColumn('err', when(col('err')=='err_present',col('err')).otherwise(col('newerr')))#conditionally update err using new_err
 .drop('newerr')#drop unwated column
).show()


+---------+-----------+
|file_name|        err|
+---------+-----------+
|       f1|err_present|
|       f1|   bad_file|
|       f1|   bad_file|
|       f2|       null|
|       f2|       null|
+---------+-----------+
dl5txlt9

dl5txlt92#

如果字符串"err_present"在该特定分区中,则可以按file_namepartition,并创建一个名为err_present_in_group的新列,即True

df.select("*", 
  F.max(
    (F.col('err') == 'err_present')
  ).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).show()

+-----------+---------+--------------------+
|        err|file_name|err_present_in_group|
+-----------+---------+--------------------+
|err_present|       f1|                true|
|           |       f1|                true|
|           |       f1|                true|
|           |       f2|               false|
|           |       f2|               false|
+-----------+---------+--------------------+

然后,通过应用基于errerr_present列中的值的条件,可以确定最终的err_present列。

df.select("*", 
  F.max(
    (F.col('err') == 'err_present')
  ).over(Window.partitionBy('file_name')).alias('err_present_in_group')
).withColumn(
    'err_present',
    F.when(
        (F.col('err_present_in_group')) & (F.col('err') == 'err_present'), 
        F.lit('err_present')
    ).when(
        (F.col('err_present_in_group')) & (F.col('err') == ''), 
        F.lit('bad_file')
    ).otherwise(None)
).select(
    'err_present','file_name'
)

+-----------+---------+
|err_present|file_name|
+-----------+---------+
|err_present|       f1|
|   bad_file|       f1|
|   bad_file|       f1|
|       null|       f2|
|       null|       f2|
+-----------+---------+

相关问题