pyspark:过滤联合的重复项,只保留groupby行中指定列的最大值

jgzswidk  于 2021-05-17  发布在  Spark
关注(0)|答案(2)|浏览(755)

我想创建一个dataframe,它包含两个dataframe中的所有行,如果有重复项,我们只保留一个列的最大值的行。
例如,如果我们有两个具有相同模式的表,如下面所示,我们将合并到一个表中,该表只包含由另一列分组的行组中具有最大列值(最高分数)的行(在下面的示例中为“name”)。

Table A
+--------------------------+
| name   | source  | score |
+--------+---------+-------+
| Finch  | Acme    | 62    |
| Jones  | Acme    | 30    |
| Lewis  | Acme    | 59    |
| Smith  | Acme    | 98    |
| Starr  | Acme    | 87    |
+--------+---------+-------+

Table B
+--------------------------+
| name   | source  | score |
+--------+---------+-------+
| Bryan  | Beta    | 93    |
| Jones  | Beta    | 75    |
| Lewis  | Beta    | 59    |
| Smith  | Beta    | 64    |
| Starr  | Beta    | 81    |
+--------+---------+-------+

Final Table
+--------------------------+
| name   | source  | score |
+--------+---------+-------+
| Bryan  | Beta    | 93    |
| Finch  | Acme    | 62    |
| Jones  | Beta    | 75    |
| Lewis  | Acme    | 59    |
| Smith  | Acme    | 98    |
| Starr  | Acme    | 87    |
+--------+---------+-------+

以下是似乎有效的方法:

from pyspark.sql import functions as F

schema = ["name", "source", "score"]

rows1 = [("Smith", "Acme", 98),
         ("Jones", "Acme", 30),
         ("Finch", "Acme", 62),
         ("Lewis", "Acme", 59),
         ("Starr", "Acme", 87)]

rows2 = [("Smith", "Beta", 64),
         ("Jones", "Beta", 75),
         ("Bryan", "Beta", 93),
         ("Lewis", "Beta", 59),
         ("Starr", "Beta", 81)]

df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)

df_union = df1.unionAll(df2)
df_agg = df_union.groupBy("name").agg(F.max("score").alias("score"))
df_final = df_union.join(df_agg, on="score", how="leftsemi").orderBy("name", F.col("score").desc()).dropDuplicates(["name"])

以上结果是我所期望的Dataframe。这似乎是一个复杂的方式来做这件事,但我不知道,因为我是相对新的Spark。这能以一种更有效、更优雅或更“Python式”的方式来实现吗?

2nc8po8w

2nc8po8w1#

我看不出你的答案有什么错,除了最后一行——你不能只在分数上加入,但需要在“name”和“score”的组合上加入,你可以选择inner join,这样就不需要删除同名分数较低的行:

df_final = (df_union.join(df_agg, on=["name", "score"], how="inner")
                    .orderBy("name")
                    .dropDuplicates(["name"]))

请注意,不需要按分数排序,只有当您希望避免为name=lewis显示两行时才需要.dropduplicates([“name”]),因为name=lewis在两个Dataframe中的分数相同。

flvlnr44

flvlnr442#

可以使用窗口函数。分区依据 name 选择最高的记录 score .

from pyspark.sql.functions import *
from pyspark.sql.window import Window

w=Window().partitionBy("name").orderBy(desc("score"))

df_union.withColumn("rank", row_number().over(w))\
        .filter(col("rank")==1).drop("rank").show()

+-----+------+-----+                                                            
| name|source|score|
+-----+------+-----+
|Bryan|  Beta|   93|
|Finch|  Acme|   62|
|Jones|  Beta|   75|
|Lewis|  Acme|   59|
|Smith|  Acme|   98|
|Starr|  Acme|   87|
+-----+------+-----+

相关问题