如何根据特定的列获取PySpark DataFrame中所有重复记录的示例?

mutmk8jj  于 2022-12-03  发布在  Spark
关注(0)|答案(2)|浏览(123)

我需要在PySpark DataFrame中查找所有重复记录。

# Prepare Data
data = [("A", "A", 1), \
    ("A", "A", 2), \
    ("A", "A", 3), \
    ("A", "B", 4), \
    ("A", "B", 5), \
    ("A", "C", 6), \
    ("A", "D", 7), \
    ("A", "E", 8), \
  ]

# Create DataFrame
columns= ["col_1", "col_2", "col_3"]
df = spark.createDataFrame(data = data, schema = columns)
df.show(truncate=False)

当我尝试以下代码时:

primary_key = ['col_1', 'col_2']
duplicate_records = df.exceptAll(df.dropDuplicates(primary_key))
duplicate_records.show()

输出将为:

正如您所看到的,我没有根据主键获取所有重复记录,因为“df.dropDuplicates(primary_key)"中存在一个重复记录示例。数据集的第1条和第4条记录必须在输出中。
有什么办法解决这个问题吗?

zsohkypk

zsohkypk1#

你不能看到第一和第四条记录的原因是dropduplicate保留每个重复项中的一个。请参见下面的代码:

primary_key = ['col_1', 'col_2']
df.dropDuplicates(primary_key).show()
+-----+-----+-----+
|col_1|col_2|col_3|
+-----+-----+-----+
|    A|    A|    1|
|    A|    B|    4|
|    A|    C|    6|
|    A|    D|    7|
|    A|    E|    8|
+-----+-----+-----+

对于您的任务,您可以提取重复的密钥,并将其与主 Dataframe 连接:

duplicated_keys = (
    df
    .groupby(primary_key)
    .count()
    .filter(F.col('count') > 1)
    .drop(F.col('count'))
)

(
    df
    .join(F.broadcast(duplicated_keys), primary_key)
).show()
+-----+-----+-----+-----+
|col_1|col_2|col_3|count|
+-----+-----+-----+-----+
|    A|    A|    1|    3|
|    A|    A|    2|    3|
|    A|    A|    3|    3|
|    A|    B|    4|    2|
|    A|    B|    5|    2|
+-----+-----+-----+-----+
t2a7ltrp

t2a7ltrp2#

这是我的2分钱
我们可以使用Window函数来实现这一点
1.创建 Dataframe :

data = [("A", "A", 1), \
 ("A", "A", 2), \
 ("A", "A", 3), \
 ("A", "B", 4), \
 ("A", "B", 5), \
 ("A", "C", 6), \
 ("A", "D", 7), \
 ("A", "E", 8), \
 ]
 columns= ["col_1", "col_2", "col_3"]
 df = spark.createDataFrame(data = data, schema = columns)
 df.show(truncate=False)

1.在主键的顶部使用Window函数计算计数,并仅提取计数大于1的那些行,然后删除计数列。

primary_key = ['col_1', 'col_2']
 windowSpec = Window.partitionBy(primary_key).orderBy(primary_key)

 df.withColumn('CountColumns',count('*').over(windowSpec)).filter('CountColumns>1').drop('CountColumns').show()

请查看以下图片以供参考:

相关问题