pyspark 标记两个 Dataframe 之间不匹配的记录

cyvaqqii  于 2022-11-01  发布在  Spark
关注(0)|答案(3)|浏览(195)

我有一个基准测试 Dataframe :

my_id    parent_id    attribute_1    attribute_2     attribute_3       attribute_4
  ABC          DEF             A-          378.8          Accept             False
  ABS          DES             A-          388.8          Accept             False
  ABB          DEG             A           908.8          Decline             True
  ABB          DEG             B-          378.8          Accept             False
  APP          DRE             C-          370.8          Accept              True

和 Dataframe :

my_id    parent_id    Attribute_1     attribute2           attr_3        attribute_5
  ABC          DEF             A-          478.8          Decline              StRing
  ABS          DES             A-          388.8          Accept               String
  ABB          DEG             A           908.8          Accept               StrIng
  ABB          DEG             C-          378.8          Accept               String
  APP          DRE             C-          370.8          Accept               STring

正如您所看到的,attribute_1、attribute_2或attribute_3中不时会出现一些错误(这些列的名称不同,但它们应该包含相同的内容)。
当我检查每一行的这三个属性是否与基准中的完全相同时,我如何标记错误记录?我希望输出类似于以下内容:

faulty_rows = 

    my_id    parent_id    Attribute_1     attribute2           attr_3       faulty_attr 
      ABC          DEF             A-          478.8          Decline       [attribute2, attr_3]                  
      ABB          DEG             A           908.8          Accept        [attr_3]      
      ABB          DEG             C-          378.8          Accept        [Attribute_1]

我所做的是重命名列,并且总是一列接一列地分别连接,这让我知道什么是错误的,但是我想同时检查整行,并且标记错误在哪里。这可能吗?PySpark或Pandas的解决方案都是好的,我对逻辑感兴趣。

zaq34kh6

zaq34kh61#

它在pyspark中很复杂。请参见下面的代码和逻辑

df1 =df1.withColumn('index', row_number().over(Window.partitionBy().orderBy('my_id','parent_id')))#Create index. This can be avaoide if you have a nuique key
df =(df.withColumn('index', row_number().over(Window.partitionBy().orderBy('my_id','parent_id')))#Create index. This can be avaoide if you have a nuique key

# Rename columns to make them similar with df1

     .withColumnRenamed('attribute_1','Attribute_1').withColumnRenamed('attribute_2','attribute2')
.withColumnRenamed('attribute_3','attr_3').withColumnRenamed('attribute_3','attr_3'))

s =(df1.drop('attribute_5').unionByName(df.drop('attribute_4')).orderBy('my_id','parent_id','index')#Union the two dfs and sort bt index
    .withColumn('change',array('attribute_1','attribute2', 'attr_3'))#Create an array of columns being invetsigated for change
    .withColumn('cols',split(lit('attribute_1,attribute2, attr_3'),'\,'))#Introduce list of column names being invetsigated
    .withColumn('change1',last('change').over(Window.partitionBy('index').orderBy('my_id','parent_id')))#For every index,put changes side by side
    .where(col('change')!=col('change1'))#Filter where changes are not the same
    .withColumn('change2', expr("transform(change,(c,i)->change[i]!=change1[i])"))#create boolean of chnages
    .withColumn('faulty_attr',expr('filter(cols,(x,j)->(change2[j]))'))#Leverage arrray functions to filter columns that didnt change as expected
    .drop('index','change','cols','change1' ,'change2')#drop unwated columns
   )
s.show(truncate=False)

+-----+---------+-----------+----------+-------+---------------------+
|my_id|parent_id|Attribute_1|attribute2|attr_3 |faulty_attr          |
+-----+---------+-----------+----------+-------+---------------------+
|ABB  |DEG      |A          |908.8     |Accept |[ attr_3]            |
|ABB  |DEG      |C-         |378.8     |Accept |[attribute_1]        |
|ABC  |DEF      |A-         |478.8     |Decline|[attribute2,  attr_3]|
+-----+---------+-----------+----------+-------+---------------------+
wfveoks0

wfveoks02#

假设A引用第一个dict,B引用第二个dict:

**选项1:**使用 Dataframe 比较

df1 = pd.DataFrame.from_dict(A)
df2 = pd.DataFrame.from_dict(B)

diffs = df1.where(df1.values==df2.values).isnull()
df1['faulty_attr'] = diffs.dot(diffs.columns)

### resulting output:

### my_id parent_id attribute_1  attribute_2 attribute_3  attribute_4             faulty_attr

### 0   ABC       DEF          A-        378.8      Accept        False  attribute_2attribute_3

### 1   ABS       DES          A-        388.8      Accept        False

### 2   ABB       DEG           A        908.8     Decline         True             attribute_3

### 3   ABB       DEG          B-        378.8      Accept        False             attribute_1

### 4   APP       DRE          C-        370.8      Accept         True

**选项2:**使用DeepDiff

from deepdiff import DeepDiff

print(DeepDiff(A, B, ignore_order=False).pretty())

### resulting output:

### Value of root['attribute_1'][3] changed from "B-" to "C-".

### Value of root['attribute_2'][0] changed from 378.8 to 478.8.

### Value of root['attribute_3'][0] changed from "Accept" to "Decline".

### Value of root['attribute_3'][2] changed from "Decline" to "Accept"

**选项3:**使用DataFrame合并

df1 = pd.DataFrame.from_dict(A)
df2 = pd.DataFrame.from_dict(B)

comparison_df = df1.merge(df2, indicator=True, how='outer')
print(comparison_df[comparison_df['_merge'] == 'right_only'])

### resulting output:

### my_id parent_id attribute_1  attribute_2 attribute_3  attribute_4      _merge

### 5   ABC       DEF          A-        478.8     Decline        False  right_only

### 6   ABB       DEG           A        908.8      Accept         True  right_only

### 7   ABB       DEG          C-        378.8      Accept        False  right_only

相关问题