如何计算组中事件之间的时间间隔

nwlls2ji  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(624)

如何在一组事件之间找到时间间隔?
例如,我有流媒体源(kafka),从中我可以得到许多列。这个流被读入spark,经过预处理和清理,只保留这四列:“clienttimestamp”,“sensor\u type”,“activity”,“user\u detail”。
现在,我要计算每个用户的关键活动存在的总时间。

Clientimestamp         Sensor_type     activity         User_detail
4/11/2021 10:00:00      ultrasonic       critical          user_A
4/11/2021 10:00:00      ultrasonic       normal            user_B            
4/11/2021 10:03:00      ultrasonic       normal            user_A
4/11/2021 10:05:00      ultrasonic       critical          user_B
4/11/2021 10:06:00      ultrasonic       critical          user_A
4/11/2021 10:07:00      ultrasonic       critical          user_A
4/11/2021 10:08:00      ultrasonic       critical          user_B
4/11/2021 10:09:00      ultrasonic       critical          user_B

因此,对于用户a,所有关键活动之间的总时间是通过找出两个关键事件之间的差异并将这些差异相加来计算的。

(10:00:00 - 10:06:00)+(10:06:00 - 10:07:00)
 therefore for userA critical activity lasted for total minute of (5+1)= 6 minutes.

同样,对于用户\u b,

(10:05:00 - 10:08:00)+ (10:08:00-10:09:00)
 userB critical activity lasted for total minute of (3+1) = 4 minute

对于每个窗口,我想调用一个自定义函数来计算totaltime。如何在按窗口分组的组上应用函数?

df = df.withWatermark("clientTimestamp", "10 minutes")\
       .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail'), col('activity')) 
       .apply(calculate_time)
disho6za

disho6za1#

看起来可以通过计算窗口中每个用户的最长和最短时间之差来解决这个问题。此外,可以应用活动上的筛选器来忽略“正常”行。
我看不出为什么这里需要应用“calculate\u time”这样的自定义函数。请注意,我并不完全熟悉python语法,但您的代码可能如下所示:

df = df \
  .filter(df.activity == "critical") \
  .withWatermark("clientTimestamp", "10 minutes") \
  .groupby(window(df.clientTimestamp, "10 minutes", "10 minutes"), col('User_detail')) \
  .agg((max("clientTimestamp") - min("clientTimestamp")).alias("time_difference"))

相关问题