基于最新记录设置新列值

pieyvz9o  于 2021-05-29  发布在  Spark
关注(0)|答案(3)|浏览(289)

我有一个类似于下面的Dataframe

+-------+-------+----------+
|dept_id|user_id|entry_date|
+-------+-------+----------+
|      3|      1|2020-06-03|
|      3|      2|2020-06-03|
|      3|      3|2020-06-03|
|      3|      4|2020-06-03|
|      3|      1|2020-06-04|
|      3|      1|2020-06-05|
+-------+-------+----------+

现在我需要添加一个新的列,它应该指示用户的最新输入日期。1表示最新,0表示旧

+-------+-------+----------+----------
|dept_id|user_id|entry_date|latest_rec
+-------+-------+----------+----------
|      3|      1|2020-06-03|0
|      3|      2|2020-06-03|1
|      3|      3|2020-06-03|1
|      3|      4|2020-06-03|1
|      3|      1|2020-06-04|0
|      3|      1|2020-06-05|1
+-------+-------+----------+---------

我试着找出用户的等级

val win = Window.partitionBy("dept_id", "user_id").orderBy(asc("entry_date"))
someDF.withColumn("rank_num",rank().over(win))

现在,我们要学习如何基于rank\u num列填充最新的\u rec列。我该如何进行下一步?

ccgok5k5

ccgok5k51#

不要使用rank,而是使用 last 当您按部门id、用户id和订单条目日期进行分区时,范围从当前行到无限制的后续行作为最新条目日期。然后比较输入日期和最新输入日期,并相应地设置最新记录值。

scala> df.show+-------+-------+----------+
|dept_id|user_id|entry_date|
+-------+-------+----------+
|      3|      1|2020-06-03|
|      3|      2|2020-06-03|
|      3|      3|2020-06-03|
|      3|      4|2020-06-03|
|      3|      1|2020-06-04|
|      3|      1|2020-06-05|
+-------+-------+----------+

scala> val win = Window.partitionBy("dept_id","user_id").orderBy("entry_date").rowsBetween(Window.currentRow, Window.unboundedFollowing)
win: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@b3f21c2

scala> df.withColumn("latest_entry_date", last($"entry_date", true).over(win)).show+-------+-------+----------+-----------------+
|dept_id|user_id|entry_date|latest_entry_date|
+-------+-------+----------+-----------------+
|      3|      1|2020-06-03|       2020-06-05|
|      3|      1|2020-06-04|       2020-06-05|
|      3|      1|2020-06-05|       2020-06-05|
|      3|      3|2020-06-03|       2020-06-03|
|      3|      2|2020-06-03|       2020-06-03|
|      3|      4|2020-06-03|       2020-06-03|
+-------+-------+----------+-----------------+

scala> df.withColumn("latest_entry_date", last($"entry_date", true).over(win)).withColumn("latest_rec", when($"entry_date" === $"latest_entry_date", 1).otherwise(0)).show
+-------+-------+----------+-----------------+----------+
|dept_id|user_id|entry_date|latest_entry_date|latest_rec|
+-------+-------+----------+-----------------+----------+
|      3|      1|2020-06-03|       2020-06-05|         0|
|      3|      1|2020-06-04|       2020-06-05|         0|
|      3|      1|2020-06-05|       2020-06-05|         1|
|      3|      3|2020-06-03|       2020-06-03|         1|
|      3|      2|2020-06-03|       2020-06-03|         1|
|      3|      4|2020-06-03|       2020-06-03|         1|
+-------+-------+----------+-----------------+----------+
nqwrtyyt

nqwrtyyt2#

我会使用行号来找到最长日期,然后根据它导出您的指标。

import org.apache.spark.sql.expressions.Window

val windowSpec = Window.partitionBy("dept_id", "user_id").orderBy("entry_date")

val win = <your df>.withColumn("der_rank",row_number().over(windowSpec))

val final = win.withColumn("latest_rec",when("der_rank" === 1,1).otherwise(0))
ccgok5k5

ccgok5k53#

另一种替代方法:

加载提供的测试数据

val data =
      """
        |dept_id|user_id|entry_date
        |      3|      1|2020-06-03
        |      3|      2|2020-06-03
        |      3|      3|2020-06-03
        |      3|      4|2020-06-03
        |      3|      1|2020-06-04
        |      3|      1|2020-06-05
      """.stripMargin

    val stringDS1 = data.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
//      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()

    /**
      * +-------+-------+----------+
      * |dept_id|user_id|entry_date|
      * +-------+-------+----------+
      * |3      |1      |2020-06-03|
      * |3      |2      |2020-06-03|
      * |3      |3      |2020-06-03|
      * |3      |4      |2020-06-03|
      * |3      |1      |2020-06-04|
      * |3      |1      |2020-06-05|
      * +-------+-------+----------+
      *
      * root
      * |-- dept_id: string (nullable = true)
      * |-- user_id: string (nullable = true)
      * |-- entry_date: string (nullable = true)
      */

使用max(entry\u date)over(按'dept\u id'、'user\u id'分区)

val w = Window.partitionBy("dept_id", "user_id")
    val latestRec = when(datediff(max(to_date($"entry_date")).over(w), to_date($"entry_date")) =!= lit(0), 0)
      .otherwise(1)
    df1.withColumn("latest_rec", latestRec)
      .orderBy("dept_id", "user_id", "entry_date")
      .show(false)

    /**
      * +-------+-------+----------+----------+
      * |dept_id|user_id|entry_date|latest_rec|
      * +-------+-------+----------+----------+
      * |3      |1      |2020-06-03|0         |
      * |3      |1      |2020-06-04|0         |
      * |3      |1      |2020-06-05|1         |
      * |3      |2      |2020-06-03|1         |
      * |3      |3      |2020-06-03|1         |
      * |3      |4      |2020-06-03|1         |
      * +-------+-------+----------+----------+
      */

相关问题