scala 删除Spark Dataframe 中重复的所有记录

zu0ti5jz  于 2022-11-09  发布在  Scala
关注(0)|答案(3)|浏览(375)

我有一个包含多列的Spark Dataframe 。我想找出并删除在一列中具有重复值的行(其他列可以不同)。
我尝试使用dropDuplicates(col_name),但它只删除重复的条目,但仍在 Dataframe 中保留一条记录。我需要的是删除最初包含重复条目的所有条目。
我用的是Spark 1.6和Scala 2.10。

06odsfpq

06odsfpq1#

为此,我会使用窗口函数。假设您想要删除重复的id行:

import org.apache.spark.sql.expressions.Window

df
  .withColumn("cnt", count("*").over(Window.partitionBy($"id")))
  .where($"cnt"===1).drop($"cnt")
  .show()
mfuanj7w

mfuanj7w2#

这可以通过按列分组以在其中查找重复项,然后聚合和过滤结果来实现。
示例 Dataframe df

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
|  4|  4|
|  4|  5|
+---+---+

id列分组以删除其重复项(最后两行):

val df2 = df.groupBy("id")
  .agg(first($"num").as("num"), count($"id").as("count"))
  .filter($"count" === 1)
  .select("id", "num")

这将为您提供:

+---+---+
| id|num|
+---+---+
|  1|  1|
|  2|  2|
|  3|  3|
+---+---+

或者,也可以使用join来完成。它会更慢,但如果有很多列,则不需要为每个列使用first($"num").as("num")来保留它们。

val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id")
val df3 = df.join(df2, Seq("id"), "inner")
g0czyy6m

g0czyy6m3#

我向使用@Raphael Roth解决方案的开放源码spark-daria库添加了一个killDuplicates()方法。以下是代码的使用方法:

import com.github.mrpowers.spark.daria.sql.DataFrameExt._

df.killDuplicates(col("id"))

// you can also supply multiple Column arguments
df.killDuplicates(col("id"), col("another_column"))

以下是代码实现:

object DataFrameExt {

  implicit class DataFrameMethods(df: DataFrame) {

    def killDuplicates(cols: Column*): DataFrame = {
      df
        .withColumn(
          "my_super_secret_count",
          count("*").over(Window.partitionBy(cols: _*))
        )
        .where(col("my_super_secret_count") === 1)
        .drop(col("my_super_secret_count"))
    }

  }

}

您可能希望利用Spark-Daria库将此逻辑排除在代码库之外。

相关问题