在聚合中,基于sum,使用scala在apachesparkDataframe中选择特定的行值

nzkunb0c  于 2021-05-27  发布在  Hadoop
关注(0)|答案(1)|浏览(421)

我想在下面的数据集中作为sum(quantity)列进行聚合,并根据sum选择相关的行值,如下所述:
输入:

val df = Seq(
  ("Acc1","111","1111","Sec1",-4000),
  ("Acc2","222","2222","Sec1",2000),
  ("Acc3","333","3333","Sec1",1000),
  ("Acc4","444","4444","Sec1",-10000)
  ).toDF("ACCOUNT_NO","LONG_IND","SHORT_IND","SECURITY_ID","QUANTITY")

How to aggregation based on the SUM(QUANTITY) such that, in final result
  (a) if Sum is negative, the row values with the maximum negative value (-10000 in this case) should take precedence
  (b) if Sum is positive, the row values with the maximum positive value (2000) should take precedence 

  In this case, SUM is negative = -4000+2000+1000-10000 = -11000, so case (a) should take precedence in above dataset, to give result as below:

Desired Output after aggregation:  
+-----------+----------+--------+---------+--------+
|SECURITY_ID|ACCOUNT_NO|LONG_IND|SHORT_IND|QUANTITY|
+-----------+----------+--------+---------+--------+
|       SEC1|      ACC4|     444|     4444|  -11000|
+-----------+----------+--------+---------+--------+

尝试的方法:

val resultDF = df
    .groupBy("SECURITY_ID")
    .agg(
      max($"SECURITY_ID").as("SECURITY_ID"),
      max($"ACCOUNT_NO").as("ACCOUNT_NO"),
      max(when($"SUM_QUANTITY" > 0, $"LONG_IND")).as("LONG_IND"),
      max(when($"SUM_QUANTITY" < 0, $"SHORT_IND")).as("SHORT_IND"),
      sum($"QUANTITY").cast("Long").as("SUM_QUANTITY")
    )
    .toDF("SECURITY_ID", "ACCOUNT_NO","LONG_IND","SHORT_IND","QUANTITY")

有没有可能以某种方式使用秩来得到这个结果?

xyhw6mcr

xyhw6mcr1#

是的,您可以使用带有行号()的窗口函数来实现您想要的:

val windowAsc = Window.partitionBy($"SECURITY_ID").orderBy($"QUANTITY".asc)
val windowDesc = Window.partitionBy($"SECURITY_ID").orderBy($"QUANTITY".desc)

df
  .withColumn("sum",sum($"QUANTITY").over(window))
  .withColumn("rnb",when($"sum"<0,row_number().over(windowAsc)).otherwise(row_number().over(windowDesc)))
  .where($"rnb"===1)
  .withColumn("QUANTITY",$"sum")
  .drop("rnb","sum")
  .show()

给予:

+----------+--------+---------+-----------+--------+
|ACCOUNT_NO|LONG_IND|SHORT_IND|SECURITY_ID|QUANTITY|
+----------+--------+---------+-----------+--------+
|      Acc4|     444|     4444|       Sec1|  -11000|
+----------+--------+---------+-----------+--------+

相关问题