spark另一种根据日期值获取最新记录的方法

gmxoilav  于 2021-04-03  发布在  Hive
关注(0)|答案(1)|浏览(809)

我有一个输入的日期,但如下面的例子。df_inp

customer_id  |ph_num|date      |
1            |123   |2020-10-01|
2            |456   |2020-10-01|
3            |789   |2020-10-01|
1            |654   |2020-10-02|
2            |543   |2020-10-03|
1            |908   |2020-10-04|
4            |123   |2020-10-02|

我需要获取每一个日常流程的最新记录。所以,我试着用windows rank()操作,结果成功了。但是,由于输入数据数以百万计,为了优化性能,我们是否可以使用其他spark操作来获取基于customer_id和日期值排序的最新数据。

window_func = Window.partition_by("customer_id ").orderBy("date")
df = df.withColumn("rank", rank().over(window_func))
df = df.filter(df.rank == "1")

在这里,customer_id--字符串和日期--时间戳。

izkcnapc

izkcnapc1#

对于spark 3.0以上的版本,可能值得检查一下max_by(或者min_by,如果你在问题中的等级为1)是否比 "window "+"filter "的方法有更好的性能特点。

df.groupBy("customer_id").agg(F.expr("max_by(ph_num,date)"), F.max(F.col("date")))

比较两种方式的执行计划,max_by方式少了一个变换(filter),但两种方式都会触发一次交换。

相关问题