我生成了一个DataFrame,如下所示:
df.groupBy($"Hour", $"Category")
.agg(sum($"value") as "TotalValue")
.sort($"Hour".asc, $"TotalValue".desc))
结果如下:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 0| cat13| 22.1|
| 0| cat95| 19.6|
| 0| cat105| 1.3|
| 1| cat67| 28.5|
| 1| cat4| 26.8|
| 1| cat13| 12.6|
| 1| cat23| 5.3|
| 2| cat56| 39.6|
| 2| cat40| 29.7|
| 2| cat187| 27.9|
| 2| cat68| 9.8|
| 3| cat8| 35.6|
| ...| ....| ....|
+----+--------+----------+
正如您所看到的,DataFrame是按Hour
的升序排序,然后按TotalValue
的降序排序。
我想选择每组的第一行,即
- 从小时==0组中选择(0,cat26,30.9)
- 从“小时==1”组中选择(1,cat67,28.5)
- 从“小时==2”组中选择(2,cat56,39.6)
- 等等
因此,所需的输出为:
+----+--------+----------+
|Hour|Category|TotalValue|
+----+--------+----------+
| 0| cat26| 30.9|
| 1| cat67| 28.5|
| 2| cat56| 39.6|
| 3| cat8| 35.6|
| ...| ...| ...|
+----+--------+----------+
如果能够选择每个组的前N行,这可能会很方便。
任何帮助都是非常感谢的。
9条答案
按热度按时间2wnc66cl1#
窗口函数:
类似这样的东西应该可以做到这一点:
如果出现严重的数据不对称,此方法将效率低下。此问题由SPARK-34775跟踪,将来可能会得到解决(SPARK-37099)。
纯SQL聚合后跟
join
:或者,您可以使用聚合数据框进行连接:
它将保留重复的值(如果每小时有多个类别具有相同的合计值)。您可以按以下方式删除这些值:
使用
structs
上的排序:虽然没有经过很好的测试,但很简洁,不需要连接或窗口函数:
使用数据集API(Spark 1.6及更高版本、2.0及更高版本):
最后两种方法可以利用Map边合并,不需要完全混洗,因此大多数情况下应该比窗口函数和连接表现出更好的性能。这些方法也可以在
completed
输出模式下与结构化流一起使用。不使用:
它可能看起来工作(特别是在
local
模式下),但它是不可靠的(参见SPARK-16207,链接相关JIRA问题的Tzach Zohar的学分,以及SPARK-30335)。同样的注意事项也适用于
它在内部使用等效的执行计划。
jfewjypa2#
对于按多列分组的Spark 2.0.2:
mspsb9vt3#
这与zero323的answer完全相同,但采用SQL查询方式。
假设 Dataframe 已创建并注册为
窗口函数:
纯SQL聚合后接联接:
对结构使用排序:
gab6jxml4#
您可以使用Spark 3.0中的
max_by()
函数!https://spark.apache.org/docs/3.0.0-preview/api/sql/index.html#max_by
omhiaaxx5#
模式按键分组=〉对每个组执行一些操作,例如reduce =〉返回 Dataframe
我认为Dataframe抽象在这种情况下有点麻烦,所以我使用了RDD功能
j13ufse26#
您可以使用Apache DataFu轻松地完成此操作(其实现类似于Antonin's answer)。
这将导致
(yes,结果将不会按 * 小时 * 排序,但如果重要,您可以在以后执行此操作)
还有一个API - dedupTopN -用于获取前 N 行,另一个API - dedupWithCombiner -用于每个分组需要大量行的情况。
(full我是DataFu项目的一部分)
dxxyhpgq7#
使用dataframe api完成此操作的一种好方法是使用argmax逻辑,如下所示
zzlelutf8#
下面的解决方案只执行一个groupBy,并一次性提取包含maxValue的 Dataframe 行。不需要进一步的联接或窗口。
sf6xfgos9#
你可以这样做-