假设我有一个相当大的数据集,其形式如下:
data = sc.parallelize([('Foo',41,'US',3),
('Foo',39,'UK',1),
('Bar',57,'CA',2),
('Bar',72,'CA',2),
('Baz',22,'US',6),
('Baz',36,'US',6)])
我想做的是只根据第一、第三和第四列的值删除重复的行。
删除完全重复的行非常简单:
data = data.distinct()
并且行5或行6将被移除
但是,如何仅删除基于列1、3和4的重复行?即,删除以下任一行:
('Baz',22,'US',6)
('Baz',36,'US',6)
在Python中,这可以通过用.drop_duplicates()
指定列来实现。我如何在Spark/Pyspark中实现同样的功能呢?
7条答案
按热度按时间zqdjd7g91#
Pyspark * 包含
dropDuplicates()
方法,该方法在1.4中引入。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html9avjhtql2#
从您的问题来看,您并不清楚要使用哪些数据行来判断重复项目。此解决方案的一般想法是根据识别重复项目的数据行值建立索引键。然后,您可以使用reduceByKey或reduce作业来消除重复项目。
下面是一些代码,可以帮助您开始:
现在,您有了一个键值
RDD
,它由列1、3和4键控。下一步将是reduceByKey
或groupByKey
和filter
。这将消除重复。5ktev3wc3#
我知道您已经接受了另一个答案,但是如果您希望以DataFrame的形式执行此操作,只需使用groupBy和agg。假设您已经创建了一个DF(包含名为“col1”、“col2”等的列),您可以执行以下操作:
请注意,在本例中,我选择了col2的Max,但您可以选择avg、min等。
hujrc8aj4#
同意大卫的观点。另外,如果我们想groupBy除聚合函数中的列之外的所有列,也就是说,如果我们想完全基于列的子集删除重复项,并保留原始 Dataframe 中的所有列,那么 * 可能(不是 * 这种情况。因此,更好的方法是使用Spark 1.4.0中提供的dropDuplicatesDataframe api
有关参考,请参见:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame
70gysomp5#
我使用了内置函数dropDuplicates()。下面给出了Scala代码
输出量:
tmb3ates6#
下面的程序将帮助您删除整个重复项,或者如果您想删除基于某些列的重复项,您甚至可以这样做:
jdg4fx2g7#
这是我的Df包含4是重复两次,所以这里将删除重复的值。