给定PyparkDataframe的形式:
+----+--------+
|time|messages|
+----+--------+
| t01| [m1]|
| t03|[m1, m2]|
| t04| [m2]|
| t06| [m3]|
| t07|[m3, m1]|
| t08| [m1]|
| t11| [m2]|
| t13|[m2, m4]|
| t15| [m2]|
| t20| [m4]|
| t21| []|
| t22|[m1, m4]|
+----+--------+
我想重构它以压缩包含相同消息的运行(输出的顺序无关紧要,但为了清晰起见,对其进行了排序):
+----------+--------+-------+
|start_time|end_time|message|
+----------+--------+-------+
| t01| t03| m1|
| t07| t08| m1|
| t22| t22| m1|
| t03| t04| m2|
| t11| t15| m2|
| t06| t07| m3|
| t13| t13| m4|
| t20| t20| m4|
| t22| t22| m4|
+----------+--------+-------+
(即处理 message
列作为序列,并标识每个消息的“运行”的开始和结束),
有没有一种干净的方法可以让spark实现这种转变?目前,我把它当作一个6gbtsv来处理。
我愿意接受 toPandas
-如果Pandas有一个干净的方法来做这个聚合的话,就在驱动程序上添加这个和累积。
(请看下面我的答案。)ïve基线实施)。
3条答案
按热度按时间mv1qrgav1#
找到了一种合理的方法,如果在应用窗口操作时可以进行分区,那么可以很好地进行扩展(在任何实际的数据集上都可以进行分区,而在我从中导出此问题的数据集上也可以进行分区)。
为了便于解释,将其分解成块(导入仅在第一个片段中)。
设置:
按时间、延迟排序并生成消息数组的差异,以标识运行的开始和结束:
按时间和消息分组,并对开始表和结束表应用秩。连接,如果为空,则复制
start_time
进入end_time
:yizd12fk2#
您可以使用正向填充尝试以下方法(不需要spark 2.4+):
步骤1:执行以下操作:
对于按时间排序的每一行,查找上一条消息和下一条消息
将消息分解为单个消息
对于每条消息,如果prev\u messages为null或消息不在prev\u messages中,则设置start=time,请参见下面的sql语法:
可以简化为:
如果next\u messages为null或message不在next\u messages中,则设置end=time
代码如下:
步骤2:创建按消息分区的windspec,并向前填充start列。
步骤3:删除end为null的行,然后只选择所需的列:
总结以上步骤,我们有以下几个步骤:
9o685dep3#
您可以在spark 2.4中找到数组函数的信息,explode\u outer是一个explode,在空数组中,它将生成一个具有“null”值的行。
首先要获得每个时刻的消息数组,开始的消息数组,以及每个时刻结束的消息数组(start\u of和end\u of)。
然后,我们只保留消息开始或结束的时刻,然后创建一个包含3列的Dataframe,每个消息开始和结束各一列。当m1和m2被创建时,会产生2行开始,当m1开始和结束时,会产生2行,带一个m1星,和m1结束。
最后,使用窗口功能按“消息”分组并按时间排序,确保如果消息在同一时刻(同一时间)开始和结束,则开始时间将优先。现在我们可以保证在每次开始之后,都会有一个结束行。把它们混合在一起,你就会知道每条信息的开头和结尾。
很好的思考练习。
我已经用scala做了这个例子,但是应该很容易翻译。每行标记为showandcontinue,打印处于该状态的示例以显示其功能。
还有table
在创建的第一个表中,可以优化提取在同一时刻开始和结束的所有元素,因此它们不必再次“匹配”开始和结束,但这取决于这是一个常见情况,还是只是少量情况。它将像这样的优化(相同的窗口)
还有table