Group by并获得前3名并写入新列spark scala

7xllpg7q  于 2023-10-23  发布在  Apache
关注(0)|答案(1)|浏览(121)

我是spark scala的新手,这是当前的数据库。

col1        col2    col3    requestErrorCode    errorCodeCount
2023-02-27  LHR     SFO         977                 1
2023-02-27  LHR     SFO         931                 3
2023-02-27  ABC     DEF         977                 1
2023-02-27  ABC     DEF         900                 5
2023-02-27  ABC     DEF         901                 10
2023-02-27  ABC     DEF         902                 12
2023-02-27  ABC     DEF         903                 11
2023-02-27  ABC     DEF         904                 20
2023-02-27  GHI     JKL         800                 3
2023-02-27  GHI     JKL         801                 5
2023-02-27  GHI     JKL         802                 7
2023-02-27  GHI     JKL         803                 100
2023-02-27  GHI     JKL         804                 92
2023-02-27  GHI     JKL         805                 11
2023-02-27  GHI     JKL         806                 17

我想按col1,col2和col3分组,然后获取前3个最高的errorCodeCount,并创建一个新的errorFrame或写入CSV(无论哪种更可取),新列如下:

col1        col2    col3    requestErrorCode1   errorCodeCount1 requestErrorCode2   errorCodeCount2 requestErrorCode3   errorCodeCount3
2023-02-27  LHR     SFO         931                 3               977                 1       
2023-02-27  ABC     DEF         904                 20              902                 12              903                 11
2023-02-27  GHI     JKL         803                 100             804                 92              806                 17

尝试了各种方案的顺序,分组和聚合如下,但没有帮助

jointView.orderBy(desc("errorCodeCount")).groupBy(colsGroupByFreq: _*)
              .agg(first("requestErrorCode"),first("errorCodeCount"))
ut6juiuv

ut6juiuv1#

你需要先定义一个窗口函数来选择3个最高的errorCodeCount

import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy($"col1", $"col2", $"col3").orderBy($"errorCodeCount".desc)

val df2 = df.withColumn("row_number", row_number.over(w)).where($"row_number" <= lit(3) )

//+----------+----+----+----------------+--------------+----------+
//|col1      |col2|col3|requestErrorCode|errorCodeCount|row_number|
//+----------+----+----+----------------+--------------+----------+
//|2023-02-27|GHI |JKL |803             |100           |1         |
//|2023-02-27|GHI |JKL |804             |92            |2         |
//|2023-02-27|GHI |JKL |806             |17            |3         |
//|2023-02-27|ABC |DEF |904             |20            |1         |
//|2023-02-27|ABC |DEF |902             |12            |2         |
//|2023-02-27|ABC |DEF |903             |11            |3         |
//|2023-02-27|LHR |SFO |931             |3             |1         |
//|2023-02-27|LHR |SFO |977             |1             |2         |
//+----------+----+----+----------------+--------------+----------+

在你得到这个框架之后,你可以在row_numberpivot来得到你需要的结果:

df2.groupBy($"col1", $"col2", $"col3").pivot("row_number").agg(first($"requestErrorCode").as("requestErrorCode"), first($"errorCodeCount").as("errorCodeCount"))

//+----------+----+----+------------------+----------------+------------------+----------------+------------------+----------------+
//|col1      |col2|col3|1_requestErrorCode|1_errorCodeCount|2_requestErrorCode|2_errorCodeCount|3_requestErrorCode|3_errorCodeCount|
//+----------+----+----+------------------+----------------+------------------+----------------+------------------+----------------+
//|2023-02-27|GHI |JKL |803               |100             |804               |92              |806               |17              |
//|2023-02-27|ABC |DEF |904               |20              |902               |12              |903               |11              |
//|2023-02-27|LHR |SFO |931               |3               |977               |1               |null              |null            |
//+----------+----+----+------------------+----------------+------------------+----------------+------------------+----------------+

相关问题