pyspark 根据分组确定重复项

a64a0gku  于 2023-10-15  发布在  Spark
关注(0)|答案(1)|浏览(118)

我需要根据给定的条件识别重复项,并根据条件筛选出记录:-
我们需要首先根据活动名称对记录进行分组,然后根据其余列查找重复项。
广告名称可以分为3部分-{prefix + keyword + suffix}。关键词是-“鞋”,“服装”和“手表”。因此,例如:活动名称“Gen_X_Shoes_TTU_Bing”表示关键字为“shoes”,前缀为“Gen_X”,后缀为“TTU_bing"。
如果这些分组在任何记录之间都是相同的,并且其余字段(广告商等)也是相同的,我们认为它是重复的。
一旦识别出重复项,我们将根据关键字的优先级从重复组中只保留1条记录。
优先级:

1) Shoes
2) Apparel
3) Watch

考虑以下情况:

adverstiser | shopid | campaign               | customerid
ABC1        | XYZ1   | Gen_X_Shoes_TTU_Bing   | cust123
ABC1        | XYZ1   | Gen_X_Apparel_TTU_Bing | cust123
ABC2        | XYZ2   | Zen_X_Watch_SKU_Gogl   | cust456
ABC2        | XYZ2   | Zen_X_Apparel_SKU_Gogl | cust456
ABC3        | XYZ3   | Gen_Z_Watch_INL_Gogl   | cust567

考虑到第一和第二记录,包含关键字“shoes”的活动的前缀是“Gen_X_”,后缀是“TTU_bing”,它们彼此完全匹配,并且所有其他列也相同。因此,这2个记录可以被认为是重复的,并且输出将是第一个记录,因为它包含“Shoes”,其优先级高于关键字“Apparel”。
类似地,考虑第3和第4条记录,活动的前缀是“Zen_X”,后缀是“Zen_Gogl”,这两条记录匹配,其他列也相同。因此,这2个记录可以被认为是重复的,并且输出将是包含关键字“Apparel”的记录。
最后,第5条记录没有找到任何匹配,因此将按原样输出。
输出量:

adverstiser | shopid | campaign               | customerid
ABC1        | XYZ1   | Gen_X_Shoes_TTU_Bing   | cust123
ABC2        | XYZ2   | Zen_X_Apparel_SKU_Gogl | cust456
ABC3        | XYZ3   | Gen_Z_Watch_INL_Gogl   | cust567

要求二:
在输出中选择的每个记录都需要复制一个额外的列“action”,在每个记录中具有值“set”和“ready”。谁能帮帮我。如果需要的话,我很乐意提供更多的清晰度。

c3frrgcw

c3frrgcw1#

您可以创建包含关键字(使用regex_extract()提取)和后缀/前缀组合(使用regex_replace()派生)的列,然后使用窗口函数选择每组的第一行。请注意,为了以正确的顺序获得关键字,我将其替换为“1”,“2”,“3”。

from pyspark.sql import functions as F
from pyspark.sql.window import Window

(
    df
    .withColumn("suffpre", F.regexp_replace(F.col("campaign"), "Shoes|Apparel|Watch", ""))
    .withColumn("keyword", F.regexp_extract(F.col("campaign"), "Shoes|Apparel|Watch", idx=0))
    .withColumn("keyword", F.when(F.col("keyword")=="Shoes","1").when(F.col("keyword")=="Apparel","2").otherwise("3"))
    .withColumn("r", F.row_number().over(
        Window
        .partitionBy(["advertiser", "shopid","customerid", "suffpre"])
        .orderBy(F.col("keyword"))))
    .filter(F.col("r")==1)
    .drop("r", "suffpre", "keyword")
).show()

输出量:

|advertiser|shopid|            campaign|customerid|
+----------+------+--------------------+----------+
|      ABC1|  XYZ1|Gen_X_Shoes_TTU_Bing|   cust123|
|      ABC2|  XYZ2|Zen_X_Apparel_SKU...|   cust456|
|      ABC3|  XYZ3|Gen_Z_Watch_INL_Gogl|   cust567|
+----------+------+--------------------+----------+

如果要复制此记录集,添加具有值setready的指示器列action,可以执行以下操作。
假设k是上面的结果:

(
    k
    .withColumn("action", F.lit("set"))
    .unionAll(
        k.withColumn("action", F.lit("ready"))
    )
).show()

输出量:

|advertiser|shopid|            campaign|customerid|action|
+----------+------+--------------------+----------+------+
|      ABC1|  XYZ1|Gen_X_Shoes_TTU_Bing|   cust123|   set|
|      ABC2|  XYZ2|Zen_X_Apparel_SKU...|   cust456|   set|
|      ABC3|  XYZ3|Gen_Z_Watch_INL_Gogl|   cust567|   set|
|      ABC1|  XYZ1|Gen_X_Shoes_TTU_Bing|   cust123| ready|
|      ABC2|  XYZ2|Zen_X_Apparel_SKU...|   cust456| ready|
|      ABC3|  XYZ3|Gen_Z_Watch_INL_Gogl|   cust567| ready|
+----------+------+--------------------+----------+------+

相关问题