我的数据由带有开始和结束时间戳的会话组成。我的任务是按公司和应用程序版本统计每个时间间隔“活动”的会话数。我以30分钟的间隔开始。因此,如果一家公司的会议时间是下午2:10到3:35。。。这家公司会在4个箱子/间隔(2:00,2:30,3:00,3:30)中的每一个中计算。如何在spark/scala中解决这个问题?
最终,我需要它来扩展每天数百万次的会话。
这是我的样本数据:
val df = sc.parallelize(List( ("Company B","xi2", "2020-07-02T01:07:00.000+0000", "2020-07-02T02:29:00.000+0000"), ("Company A","xi1", "2020-07-01T23:55:00.000+0000", "2020-07-02T01:17:00.000+0000"), ("Company B","xi2", "2020-07-01T22:31:00.000+0000", "2020-07-01T23:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:07:00.000+0000", "2020-07-02T00:29:00.000+0000"), ("Company A","xi1", "2020-07-01T22:19:00.000+0000", "2020-07-01T23:41:00.000+0000"), ("Company B","xi1", "2020-07-02T00:07:00.000+0000", "2020-07-02T01:29:00.000+0000"), ("Company B","xi1", "2020-07-02T00:55:00.000+0000", "2020-07-02T02:17:00.000+0000"), ("Company A","xi1", "2020-07-02T00:19:00.000+0000", "2020-07-02T01:41:00.000+0000"), ("Company A","xi2", "2020-07-01T22:55:00.000+0000", "2020-07-02T00:17:00.000+0000"), ("Company B","xi2", "2020-07-02T00:43:00.000+0000", "2020-07-02T02:05:00.000+0000"), ("Company A","xi2", "2020-07-01T23:31:00.000+0000", "2020-07-02T00:53:00.000+0000"), ("Company B","xi1", "2020-07-01T23:19:00.000+0000", "2020-07-02T00:41:00.000+0000"), ("Company A","xi2", "2020-07-01T23:43:00.000+0000", "2020-07-02T01:05:00.000+0000"), ("Company A","xi2", "2020-07-02T00:31:00.000+0000", "2020-07-02T01:53:00.000+0000"), ("Company A","xi2", "2020-07-01T22:43:00.000+0000", "2020-07-02T00:05:00.000+0000") )).toDF("customer","device_model","start_timestamp","end_timestamp")
.withColumn("start_timestamp", to_timestamp($"start_timestamp"))
.withColumn("end_timestamp", to_timestamp($"end_timestamp"))
display(df)
我希望我的结果如下。这些计数是在30分钟的间隔,但最终我将计数低至一两分钟的间隔。
timeinterval customer xi1 xi2
2020-07-01 22:30:00 Company A 1 1
2020-07-01 22:30:00 Company B 0 1
2020-07-01 23:00:00 Company A 1 2
2020-07-01 23:00:00 Company B 1 1
2020-07-01 23:30:00 Company A 1 4
2020-07-01 23:30:00 Company B 2 1
2020-07-02 00:00:00 Company A 1 4
2020-07-02 00:00:00 Company B 3 1
2020-07-02 00:30:00 Company A 2 4
2020-07-02 00:30:00 Company B 3 1
2020-07-02 01:00:00 Company A 2 3
2020-07-02 01:00:00 Company B 2 2
2020-07-02 01:30:00 Company A 2 1
2020-07-02 01:30:00 Company B 2 2
2020-07-02 02:00:00 Company A 0 1
2020-07-02 02:00:00 Company B 1 2
2020-07-02 02:30:00 Company B 1 1
任何关于最佳方法的帮助或想法都将不胜感激。
1条答案
按热度按时间dwbf0jvd1#
也许这是有帮助的-
加载提供的测试数据
序列+间隔生成箱子/间隔
根据需要更改间隔分钟
pivot+count以获取每个间隔的计数