pyspark 识别数据库框架中的重复和非重复记录

cidc1ykv  于 2023-10-15  发布在  Spark
关注(0)|答案(4)|浏览(118)

我有一个包含重复和不同记录的数据库。我必须确定哪些是重复的记录,哪些是不同的记录,并将它们分别拆分为2个不同的子帧。
输入:

custid | cust_name | loc | prodid
1234   | John      | US  | P133
1234   | John      | US  | P133
1234   | John      | US  | P133
5678   | Mike      | CHN | P456
4325   | Peter     | RUS | P247
3458   | Andy      | IND | P764 
3458   | Andy      | IND | P764

输出:DF 1(Dups):

custid | cust_name | loc | prodid
1234   | John      | US  | P133
1234   | John      | US  | P133
1234   | John      | US  | P133
3458   | Andy      | IND | P764 
3458   | Andy      | IND | P764

DF2(非重复):

custid | cust_name | loc | prodid
5678   | Mike      | CHN | P456
4325   | Peter     | RUS | P247

有人能帮帮忙吗。

5ssjco0h

5ssjco0h1#

创建一个窗口规范来计算每组中的行数,然后检查count大于1的行,以创建一个布尔标志has_dupes,然后使用此标志filter子集

W = Window.partitionBy(*df.columns)
df = df.withColumn('has_dupes', F.count(F.lit(1)).over(W) > 1)

df_dupes = df.filter('has_dupes')
df_nodupes = df.filter('not has_dupes')
df_nodupes.show()
+------+---------+---+------+---------+
|custid|cust_name|loc|prodid|has_dupes|
+------+---------+---+------+---------+
|  4325|    Peter|RUS|  P247|    false|
|  5678|     Mike|CHN|  P456|    false|
+------+---------+---+------+---------+

df_dupes.show()
+------+---------+---+------+---------+
|custid|cust_name|loc|prodid|has_dupes|
+------+---------+---+------+---------+
|  1234|     John| US|  P133|     true|
|  1234|     John| US|  P133|     true|
|  1234|     John| US|  P133|     true|
|  3458|     Andy|IND|  P764|     true|
|  3458|     Andy|IND|  P764|     true|
+------+---------+---+------+---------+
kgsdhlau

kgsdhlau2#

使用Spark SQL

df.createorReplaceTempView("df_main")

df_non_dup=spark.sql("""select * from df_main where custid in (select custid from df group by custid having count(*)=1)""")

df_dup=spark.sql("""select * from df_main where custid not in (select custid from df group by custid having count(*)=1)""")

使用SPARK CORE

df1=df.select('*').distinct()
pandasDF1 = df1.toPandas().astype(str)
pandasDF=df.toPandas().astype(str)
pandasDF2=pandasDF[~pandasDF.isin(pandasDF1)].dropna()
df2=spark.createDataFrame(pandasDF2)
bqf10yzr

bqf10yzr3#

这里有一个简单的方法。在对所有列执行groupby时获取重复项的计数。然后根据计数进行过滤,并创建两个嵌套框,一个具有重复的客户ID,另一个具有唯一的客户ID。然后将这些与原始的框架结合起来。

import pyspark.sql.functions as F
from pyspark import SparkContext, SQLContext

sc = SparkContext('local')
sqlContext = SQLContext(sc)
### This is very important setting if you want legacy behaviour
sqlContext.setConf("spark.sql.legacy.timeParserPolicy", "LEGACY")

data1 = [
    [1234, "John", "US", "P133"],
    [1234, "John", "US", "P133"],
    [1234, "John", "US", "P133"],
    [5678, "Mike", "CHN", "P456"],
    [4325, "Peter", "RUS", "P247"],
    [3458, "Andy", "IND", "P764"],
    [3458, "Andy", "IND", "P764"],

]


df1Columns = ["custid","cust_name","loc","prodid"]
df1 = sqlContext.createDataFrame(data=data1, schema=df1Columns)


df1.show(n=100, truncate=False)

df1_count = df1.groupby(["custid","cust_name","loc","prodid"]).agg(F.count("*").alias("count"))
df1_count.show(n=100, truncate=False)

df1_select = df1_count.select("custid", "count")
df1_select.show(n=100, truncate=False)

df1_more_than_one = df1_select.filter(F.col("count") > 1)
df1_exactly_one = df1_select.filter(F.col("count") == 1)

df1_more_than_one.show(n=100, truncate=False)
df1_exactly_one.show(n=100, truncate=False)

df1_duplicates = df1.join(df1_more_than_one, on=["custid"]).drop("count")
df1_non_duplicates = df1.join(df1_exactly_one, on=["custid"]).drop("count")

print("Duplicates dataframe below")
df1_duplicates.show(n=100, truncate=False)
print("Non duplicates dataframe below")
df1_non_duplicates.show(n=100, truncate=False)

输出量:

+------+---------+---+------+
|custid|cust_name|loc|prodid|
+------+---------+---+------+
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|5678  |Mike     |CHN|P456  |
|4325  |Peter    |RUS|P247  |
|3458  |Andy     |IND|P764  |
|3458  |Andy     |IND|P764  |
+------+---------+---+------+

+------+---------+---+------+-----+
|custid|cust_name|loc|prodid|count|
+------+---------+---+------+-----+
|5678  |Mike     |CHN|P456  |1    |
|1234  |John     |US |P133  |3    |
|3458  |Andy     |IND|P764  |2    |
|4325  |Peter    |RUS|P247  |1    |
+------+---------+---+------+-----+

+------+-----+
|custid|count|
+------+-----+
|5678  |1    |
|1234  |3    |
|3458  |2    |
|4325  |1    |
+------+-----+

+------+-----+
|custid|count|
+------+-----+
|1234  |3    |
|3458  |2    |
+------+-----+

+------+-----+
|custid|count|
+------+-----+
|5678  |1    |
|4325  |1    |
+------+-----+

Duplicates dataframe below
+------+---------+---+------+
|custid|cust_name|loc|prodid|
+------+---------+---+------+
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|1234  |John     |US |P133  |
|3458  |Andy     |IND|P764  |
|3458  |Andy     |IND|P764  |
+------+---------+---+------+

Non duplicates dataframe below
+------+---------+---+------+
|custid|cust_name|loc|prodid|
+------+---------+---+------+
|5678  |Mike     |CHN|P456  |
|4325  |Peter    |RUS|P247  |
+------+---------+---+------+
jum4pzuy

jum4pzuy4#

import pyspark.sql.functions as f
df_basket1.join(
    df_basket1.groupBy(df_basket1.columns).agg((f.count("*")>1).cast("int").alias("Duplicate_indicator")),
    on=df_basket1.columns,
    how="inner"
)

相关问题