基于RDD/Spark DataFrame中的特定列删除行中的重复项

31moq8wy  于 2022-11-16  发布在  Apache
关注(0)|答案(7)|浏览(261)

假设我有一个相当大的数据集,其形式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])

我想做的是只根据第一、第三和第四列的值删除重复的行。
删除完全重复的行非常简单:

data = data.distinct()

并且行5或行6将被移除
但是,如何仅删除基于列1、3和4的重复行?即,删除以下任一行:

('Baz',22,'US',6)
('Baz',36,'US',6)

在Python中,这可以通过用.drop_duplicates()指定列来实现。我如何在Spark/Pyspark中实现同样的功能呢?

zqdjd7g9

zqdjd7g91#

Pyspark * 包含dropDuplicates()方法,该方法在1.4中引入。https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.dropDuplicates.html

>>> from pyspark.sql import Row
>>> df = sc.parallelize([ \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=5, height=80), \
...     Row(name='Alice', age=10, height=80)]).toDF()
>>> df.dropDuplicates().show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+

>>> df.dropDuplicates(['name', 'height']).show()
+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+
9avjhtql

9avjhtql2#

从您的问题来看,您并不清楚要使用哪些数据行来判断重复项目。此解决方案的一般想法是根据识别重复项目的数据行值建立索引键。然后,您可以使用reduceByKey或reduce作业来消除重复项目。
下面是一些代码,可以帮助您开始:

def get_key(x):
    return "{0}{1}{2}".format(x[0],x[2],x[3])

m = data.map(lambda x: (get_key(x),x))

现在,您有了一个键值RDD,它由列1、3和4键控。下一步将是reduceByKeygroupByKeyfilter。这将消除重复。

r = m.reduceByKey(lambda x,y: (x))
5ktev3wc

5ktev3wc3#

我知道您已经接受了另一个答案,但是如果您希望以DataFrame的形式执行此操作,只需使用groupBy和agg。假设您已经创建了一个DF(包含名为“col1”、“col2”等的列),您可以执行以下操作:

myDF.groupBy($"col1", $"col3", $"col4").agg($"col1", max($"col2"), $"col3", $"col4")

请注意,在本例中,我选择了col2的Max,但您可以选择avg、min等。

hujrc8aj

hujrc8aj4#

同意大卫的观点。另外,如果我们想groupBy除聚合函数中的列之外的所有列,也就是说,如果我们想完全基于列的子集删除重复项,并保留原始 Dataframe 中的所有列,那么 * 可能(不是 * 这种情况。因此,更好的方法是使用Spark 1.4.0中提供的dropDuplicatesDataframe api
有关参考,请参见:https://spark.apache.org/docs/1.4.0/api/scala/index.html#org.apache.spark.sql.DataFrame

70gysomp

70gysomp5#

我使用了内置函数dropDuplicates()。下面给出了Scala代码

val data = sc.parallelize(List(("Foo",41,"US",3),
("Foo",39,"UK",1),
("Bar",57,"CA",2),
("Bar",72,"CA",2),
("Baz",22,"US",6),
("Baz",36,"US",6))).toDF("x","y","z","count")

data.dropDuplicates(Array("x","count")).show()

输出量:

+---+---+---+-----+
|  x|  y|  z|count|
+---+---+---+-----+
|Baz| 22| US|    6|
|Foo| 39| UK|    1|
|Foo| 41| US|    3|
|Bar| 57| CA|    2|
+---+---+---+-----+
tmb3ates

tmb3ates6#

下面的程序将帮助您删除整个重复项,或者如果您想删除基于某些列的重复项,您甚至可以这样做:

import org.apache.spark.sql.SparkSession

object DropDuplicates {
def main(args: Array[String]) {
val spark =
  SparkSession.builder()
    .appName("DataFrame-DropDuplicates")
    .master("local[4]")
    .getOrCreate()

import spark.implicits._

// create an RDD of tuples with some data
val custs = Seq(
  (1, "Widget Co", 120000.00, 0.00, "AZ"),
  (2, "Acme Widgets", 410500.00, 500.00, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (4, "Widgets R Us", 410500.00, 0.0, "CA"),
  (3, "Widgetry", 410500.00, 200.00, "CA"),
  (5, "Ye Olde Widgete", 500.00, 0.0, "MA"),
  (6, "Widget Co", 12000.00, 10.00, "AZ")
)
val customerRows = spark.sparkContext.parallelize(custs, 4)

// convert RDD of tuples to DataFrame by supplying column names
val customerDF = customerRows.toDF("id", "name", "sales", "discount", "state")

println("*** Here's the whole DataFrame with duplicates")

customerDF.printSchema()

customerDF.show()

// drop fully identical rows
val withoutDuplicates = customerDF.dropDuplicates()

println("*** Now without duplicates")

withoutDuplicates.show()

val withoutPartials = customerDF.dropDuplicates(Seq("name", "state"))

println("*** Now without partial duplicates too")

withoutPartials.show()

 }
 }
jdg4fx2g

jdg4fx2g7#

这是我的Df包含4是重复两次,所以这里将删除重复的值。

scala> df.show
+-----+
|value|
+-----+
|    1|
|    4|
|    3|
|    5|
|    4|
|   18|
+-----+

scala> val newdf=df.dropDuplicates

scala> newdf.show
+-----+
|value|
+-----+
|    1|
|    3|
|    5|
|    4|
|   18|
+-----+

相关问题