pyspark 如何获得最接近某个时间的时间戳?

thtygnil  于 2023-04-29  发布在  Spark
关注(0)|答案(2)|浏览(125)

我需要选择一个时间戳每小时频率的数据。但是,由于有时时间戳无法提供确切的时间,所以我需要选择时间戳最接近时间的数据。这是我下面的数据框。

| job_id| timestamp                   |item_values   |
|:----  |:---------------------------:|:------------:|
| job1  | 2022-02-15T23:40:00.000+0000|[0.2,3.4,13.2]|       
| job1  | 2022-02-15T23:10:00.000+0000|[0.1,2.9,11.2]|
| job2  | 2022-02-15T23:40:00.000+0000|[1.2,3.1,16.0]|
| job1  | 2022-02-15T23:20:00.000+0000|[0.4,0.4,16.2]|       
| job2  | 2022-02-15T23:10:00.000+0000|[0.7,8.4,11.2]|
| job2  | 2022-02-15T23:20:00.000+0000|[0.3,1.5,19.1]|  
| job3  | 2022-02-15T23:20:00.000+0000|[0.7,7.4,13.2]|
| job3  | 2022-02-15T23:40:00.000+0000|[0.7,7.4,13.2]|
| job3  | 2022-02-15T23:10:00.000+0000|[0.7,7.4,13.2]|

比如说
如果我需要选择下面这个时间
2023-04-25T22:00:00.000+0000
它在表中不可用,但2023-04-25T22:10:00.000+0000是最接近2023-04-25T22:00:00.000+0000的时间,我想选择时间戳为2023-04-25T22:10:00.000+0000的数据。
我想要的输出是

| job_id| timestamp                   |item_values   |
|:----  |:---------------------------:|:------------:|       
| job1  | 2022-02-15T23:10:00.000+0000|[0.1,2.9,11.2]|       
| job2  | 2022-02-15T23:10:00.000+0000|[0.7,8.4,11.2]|
| job3  | 2022-02-15T23:10:00.000+0000|[0.7,7.4,13.2]|

如何使用Pyspark实现这一点?任何帮助都非常感谢!

muk1a3rh

muk1a3rh1#

您希望获取时间戳与您选择的预定义时间戳最接近的行。我可以想到两种方法来解决这个问题,尽管两者都有相同的核心思想:创建一个列,其中包含timestamp列和您要查找的时间戳之间的绝对差值。然后选择在此创建的列中具有最小值的行。

  • 编辑:获取时间戳差异在这里解释 *.
    方法1

确定数据集中与目标时间戳最接近的时间戳值,然后过滤器对找到的时间戳施加相等性:
1.仅选择您的timestamp列并应用distinct():我们只对找到一个现有值感兴趣。
1.添加列“difference”作为 *col('timestamp')- lit(PREDEFINED_TIMESTAMP_VALUE)的绝对值 * 现在这个新列表示每行的时间戳与目标时间戳值之间的差值。
1.选择具有此新列的最小值的行。您刚刚找到了数据集中最接近的时间戳,您在那里查找。
1.过滤原始 Dataframe 施加时间戳==“TIMESTAMP_FOUND”

方法2

与第一种方法非常相似,但直接处理原始数据集。
1.直接添加列“difference”作为col('timestamp')- lit(PREDEFINED_TIMESTAMP_VALUE)的绝对值。现在这个新列表示每行的时间戳与目标时间戳值之间的差值。
1.选择具有此新列的最小值的行。您刚刚找到了数据集中最接近的时间戳,您在那里查找。
我认为方法1应该更有效,但方法2更直接地实施。如果成功了告诉我。

jljoyd4f

jljoyd4f2#

您可以通过以下方式使用Pyspark UDF,以获得与@Sergio建议的结果类似的所需结果。我的初始 Dataframe 如下所示:

data = [["job1", "2022-02-15T23:40:00.000+0000", [0.2,3.4,13.2]],["job1", "2022-02-15T23:10:00.000+0000", [0.1,2.9,11.2]],["job2", "2022-02-15T23:40:00.000+0000", [1.2,3.1,16.0]],["job1", "2022-02-15T23:20:00.000+0000", [0.4,0.4,16.2]],["job2", "2022-02-15T23:10:00.000+0000", [0.7,8.4,11.2]],["job2", "2022-02-15T23:20:00.000+0000", [0.3,1.5,19.1]],["job3", "2022-02-15T23:20:00.000+0000", [0.7,7.4,13.2]],["job3", "2022-02-15T23:40:00.000+0000", [0.7,7.4,13.2]],["job3", "2022-02-15T23:10:00.000+0000", [0.7,7.4,13.2]]]
columns = ["job_id", "timestamp", "item_values"]
df = spark.createDataFrame(data, columns)
df.show(truncate=False)

  • 现在,使用如下所示的代码来根据需要使用UDF和过滤器:
key = "2022-02-15T23:18:00.000+0000"
frmt = "%Y-%m-%dT%H:%M:%S.%f%z"

def diff_ts(ip):
    from datetime import datetime

    t1 = datetime.strptime(key, frmt).timestamp()
    t2 = datetime.strptime(ip, frmt).timestamp()
    return t1-t2

from pyspark.sql.functions import col, udf,abs,min
from pyspark.sql.types import *

convertUDF = udf(lambda z: diff_ts(z),DoubleType())

df1=df.withColumn("new",abs(convertUDF(col("timestamp"))))
min_val = df1.select(min("new")).first()[0]
df1.filter(col("new")==min_val).select("job_id", "timestamp", "item_values").show(truncate=False)

相关问题