scala 在Spark中使用Lead Window函数时可以忽略空值吗?

k7fdbhmy  于 2022-11-09  发布在  Scala
关注(0)|答案(3)|浏览(328)

我的 Dataframe 是这样的

id  value  date    
1   100    2017 
1   null   2016 
1   20     2015 
1   100    2014

我想获取最新的前值,但忽略空值

id  value  date   recent value
1   100    2017    20
1   null   2016    20
1   20     2015   100
1   100    2014   null

有没有办法在使用lead窗口函数时忽略空值?

vshtjzan

vshtjzan1#

在Spark中使用Lead Window函数时是否可以忽略空值
但事实并非如此。
我希望获得最新的值,但忽略空值
只需将last(或first)与ignoreNulls一起使用:
def last(columnName: String, ignoreNulls: Boolean): Column
聚合函数:返回组中列的最后一个值。
默认情况下,该函数返回它看到的最后一个值。它将返回它看到的最后一个非NULL值,该值设置为TRUE。如果所有值都为NULL,则返回NULL。

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(
  (1, Some(100), 2017), (1, None, 2016), (1, Some(20), 2015), 
  (1, Some(100), 2014)
).toDF("id", "value", "date")

df.withColumn(
  "last_value",
   last("value", true).over(Window.partitionBy("id").orderBy("date"))
).show

+---+-----+----+----------+                                                     
| id|value|date|last_value|
+---+-----+----+----------+
|  1|  100|2014|       100|
|  1|   20|2015|        20|
|  1| null|2016|        20|
|  1|  100|2017|       100|
+---+-----+----+----------+
hkmswyz6

hkmswyz62#

您可以分两步完成此操作:
1.创建具有非空值的表
1.在原始表上连接

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._

val df = Seq(
  (1, Some(100), 2017),
  (1, None, 2016),
  (1, Some(20), 2015),
  (1, Some(100), 2014)
).toDF("id", "value", "date")

// Step 1
val filledDf = df
  .where($"value".isNotNull)
  .withColumnRenamed("value", "recent_value")

// Step 2
val window: WindowSpec = Window.partitionBy("l.id", "l.date").orderBy($"r.date".desc)

val finalDf = df.as("l")
  .join(filledDf.as("r"), $"l.id" === $"r.id" && $"l.date" > $"r.date", "left")
  .withColumn("rn", row_number().over(window))
  .where($"rn" === 1)
  .select("l.id", "l.date", "value", "recent_value")

finalDf.orderBy($"date".desc).show

+---+----+-----+------------+
| id|date|value|recent_value|
+---+----+-----+------------+
|  1|2017|  100|          20|
|  1|2016| null|          20|
|  1|2015|   20|         100|
|  1|2014|  100|        null|
+---+----+-----+------------+
sh7euo9m

sh7euo9m3#

**Spark 3.2+**在lead中提供ignoreNulls,在Scala中提供lag
Lead(e:Column,Offset:int,defaultValue:Any,IgnreNulls:Boolean):Column
LAG(e:Column,Offset:int,defaultValue:Any,IgnreNulls:Boolean):Column

测试输入:

import org.apache.spark.sql.expressions.Window

val df = Seq[(Integer, Integer, Integer)](
    (1, 100, 2017),
    (1, null, 2016),
    (1, 20, 2015),
    (1, 100, 2014)
).toDF("id", "value", "date")

lead

val w = Window.partitionBy("id").orderBy(desc("date"))
val df2 = df.withColumn("lead_val", lead($"value", 1, null, true).over(w))
df2.show()
// +---+-----+----+--------+
// | id|value|date|lead_val|
// +---+-----+----+--------+
// |  1|  100|2017|      20|
// |  1| null|2016|      20|
// |  1|   20|2015|     100|
// |  1|  100|2014|    null|
// +---+-----+----+--------+

lag

val w = Window.partitionBy("id").orderBy("date")
val df2 = df.withColumn("lead_val", lag($"value", 1, null, true).over(w))
df2.show()
// +---+-----+----+--------+
// | id|value|date|lead_val|
// +---+-----+----+--------+
// |  1|  100|2014|    null|
// |  1|   20|2015|     100|
// |  1| null|2016|      20|
// |  1|  100|2017|      20|
// +---+-----+----+--------+

相关问题