我想创建一个dataframe,它包含两个dataframe中的所有行,如果有重复项,我们只保留一个列的最大值的行。
例如,如果我们有两个具有相同模式的表,如下面所示,我们将合并到一个表中,该表只包含由另一列分组的行组中具有最大列值(最高分数)的行(在下面的示例中为“name”)。
Table A
+--------------------------+
| name | source | score |
+--------+---------+-------+
| Finch | Acme | 62 |
| Jones | Acme | 30 |
| Lewis | Acme | 59 |
| Smith | Acme | 98 |
| Starr | Acme | 87 |
+--------+---------+-------+
Table B
+--------------------------+
| name | source | score |
+--------+---------+-------+
| Bryan | Beta | 93 |
| Jones | Beta | 75 |
| Lewis | Beta | 59 |
| Smith | Beta | 64 |
| Starr | Beta | 81 |
+--------+---------+-------+
Final Table
+--------------------------+
| name | source | score |
+--------+---------+-------+
| Bryan | Beta | 93 |
| Finch | Acme | 62 |
| Jones | Beta | 75 |
| Lewis | Acme | 59 |
| Smith | Acme | 98 |
| Starr | Acme | 87 |
+--------+---------+-------+
以下是似乎有效的方法:
from pyspark.sql import functions as F
schema = ["name", "source", "score"]
rows1 = [("Smith", "Acme", 98),
("Jones", "Acme", 30),
("Finch", "Acme", 62),
("Lewis", "Acme", 59),
("Starr", "Acme", 87)]
rows2 = [("Smith", "Beta", 64),
("Jones", "Beta", 75),
("Bryan", "Beta", 93),
("Lewis", "Beta", 59),
("Starr", "Beta", 81)]
df1 = spark.createDataFrame(rows1, schema)
df2 = spark.createDataFrame(rows2, schema)
df_union = df1.unionAll(df2)
df_agg = df_union.groupBy("name").agg(F.max("score").alias("score"))
df_final = df_union.join(df_agg, on="score", how="leftsemi").orderBy("name", F.col("score").desc()).dropDuplicates(["name"])
以上结果是我所期望的Dataframe。这似乎是一个复杂的方式来做这件事,但我不知道,因为我是相对新的Spark。这能以一种更有效、更优雅或更“Python式”的方式来实现吗?
2条答案
按热度按时间2nc8po8w1#
我看不出你的答案有什么错,除了最后一行——你不能只在分数上加入,但需要在“name”和“score”的组合上加入,你可以选择inner join,这样就不需要删除同名分数较低的行:
请注意,不需要按分数排序,只有当您希望避免为name=lewis显示两行时才需要.dropduplicates([“name”]),因为name=lewis在两个Dataframe中的分数相同。
flvlnr442#
可以使用窗口函数。分区依据
name
选择最高的记录score
.