我有一个包含多列的Spark Dataframe 。我想找出并删除在一列中具有重复值的行(其他列可以不同)。我尝试使用dropDuplicates(col_name),但它只删除重复的条目,但仍在 Dataframe 中保留一条记录。我需要的是删除最初包含重复条目的所有条目。我用的是Spark 1.6和Scala 2.10。
dropDuplicates(col_name)
06odsfpq1#
为此,我会使用窗口函数。假设您想要删除重复的id行:
id
import org.apache.spark.sql.expressions.Window df .withColumn("cnt", count("*").over(Window.partitionBy($"id"))) .where($"cnt"===1).drop($"cnt") .show()
mfuanj7w2#
这可以通过按列分组以在其中查找重复项,然后聚合和过滤结果来实现。示例 Dataframe df:
df
+---+---+ | id|num| +---+---+ | 1| 1| | 2| 2| | 3| 3| | 4| 4| | 4| 5| +---+---+
按id列分组以删除其重复项(最后两行):
val df2 = df.groupBy("id") .agg(first($"num").as("num"), count($"id").as("count")) .filter($"count" === 1) .select("id", "num")
这将为您提供:
+---+---+ | id|num| +---+---+ | 1| 1| | 2| 2| | 3| 3| +---+---+
或者,也可以使用join来完成。它会更慢,但如果有很多列,则不需要为每个列使用first($"num").as("num")来保留它们。
join
first($"num").as("num")
val df2 = df.groupBy("id").agg(count($"id").as("count")).filter($"count" === 1).select("id") val df3 = df.join(df2, Seq("id"), "inner")
g0czyy6m3#
我向使用@Raphael Roth解决方案的开放源码spark-daria库添加了一个killDuplicates()方法。以下是代码的使用方法:
killDuplicates()
import com.github.mrpowers.spark.daria.sql.DataFrameExt._ df.killDuplicates(col("id")) // you can also supply multiple Column arguments df.killDuplicates(col("id"), col("another_column"))
以下是代码实现:
object DataFrameExt { implicit class DataFrameMethods(df: DataFrame) { def killDuplicates(cols: Column*): DataFrame = { df .withColumn( "my_super_secret_count", count("*").over(Window.partitionBy(cols: _*)) ) .where(col("my_super_secret_count") === 1) .drop(col("my_super_secret_count")) } } }
您可能希望利用Spark-Daria库将此逻辑排除在代码库之外。
3条答案
按热度按时间06odsfpq1#
为此,我会使用窗口函数。假设您想要删除重复的
id
行:mfuanj7w2#
这可以通过按列分组以在其中查找重复项,然后聚合和过滤结果来实现。
示例 Dataframe
df
:按
id
列分组以删除其重复项(最后两行):这将为您提供:
或者,也可以使用
join
来完成。它会更慢,但如果有很多列,则不需要为每个列使用first($"num").as("num")
来保留它们。g0czyy6m3#
我向使用@Raphael Roth解决方案的开放源码spark-daria库添加了一个
killDuplicates()
方法。以下是代码的使用方法:以下是代码实现:
您可能希望利用Spark-Daria库将此逻辑排除在代码库之外。