获取pyspark中对应于最新时间戳的行

pod7payv  于 2021-06-13  发布在  Cassandra
关注(0)|答案(2)|浏览(301)

我有一个Dataframe:

+--------------+-----------------+-------------------+
|          ecid|    creation_user| creation_timestamp|
+--------------+-----------------+-------------------+
|ECID-195000300|USER_ID1          |2018-08-31 20:00:00|
|ECID-195000300|USER_ID2          |2016-08-31 20:00:00|

我需要一行最早的时间戳为:

+--------------+-----------------+-------------------+
    |          ecid|    creation_user| creation_timestamp|
    +--------------+-----------------+-------------------+
    |ECID-195000300|USER_ID2          |2016-08-31 20:00:00|

我怎么能在Pypark做到这一点:我试过了

df.groupBy("ecid").agg(min("creation_timestamp"))

不过,我只是得到ecid和timestamp字段。我想要所有的领域,而不仅仅是两个领域

63lcw9qa

63lcw9qa1#

使用窗口 row_number 按开划分的函数 ecid 并在 creation_timestamp . Example: ```

sampledata

df=spark.createDataFrame([("ECID-195000300","USER_ID1","2018-08-31 20:00:00"),("ECID-195000300","USER_ID2","2016-08-31 20:00:00")],["ecid","creation_user","creation_timestamp"])

from pyspark.sql import Window
from pyspark.sql.functions import *

w = Window.partitionBy('ecid').orderBy("creation_timestamp")

df.withColumn("rn",row_number().over(w)).filter(col("rn") ==1).drop("rn").show()

+--------------+-------------+-------------------+

| ecid|creation_user| creation_timestamp|

+--------------+-------------+-------------------+

|ECID-195000300| USER_ID2|2016-08-31 20:00:00|

+--------------+-------------+-------------------+

ss2ws0br

ss2ws0br2#

我想你需要一个 window 功能+过滤器。我可以向您提出以下未经测试的解决方案:

import pyspark.sql.window as psw
import pyspark.sql.functions as psf

w = psw.Window.partitionBy('ecid')
df = (df.withColumn("min_tmp", psf.min('creation_timestamp').over(w))
         .filter(psf.col("min_tmp") == psf.col("creation_timestamp")) 
)

这个 window 函数允许您返回 min 在每个 ecid 作为你的新专栏 DataFrame

相关问题