pyspark Spark识别二进制列中1序列

b91juud3  于 2022-11-01  发布在  Spark
关注(0)|答案(2)|浏览(147)

我有一列二进制传感器数据。我想识别一个连续的1序列,它表示一个事件正在发生,并获得它持续的时间间隔。这对Spark来说可行吗?这里是我正在处理的数据的一个例子。

如果我可以逐行查看数据,我就可以做到这一点,但我需要先执行collect(),然后所有数据都将在一台机器上。我的一些想法是:是否有办法收集工作节点上的数据,然后在其上执行迭代算法以生成事件信息,然后将数据返回到驱动程序上。我还读到spark 2.2中有一种称为结构化流的东西,但我不确定这是否是我正在寻找的。
欢迎提出其他意见。
顺便说一句,我和pyspark一起工作,我对这个很陌生。

pxy2qtax

pxy2qtax1#

我先看了你的另一个重复问题(后来被标记为重复),所以我会回答这个问题。
是的,您可以像在使用partition by命令的查询中一样使用窗口函数。请参阅How to find longest sequence of consecutive dates?
使用类似的方法,但使用 Dataframe 转换,您可以实现相同的效果。

from pyspark.sql import Window
from pyspark.sql.functions import row_number

lst=[[1,1],[2,1],[3,1],[4,1],[5,0],[6,0],[7,0],[8,1],[9,1],[10,1]]
df=spark.createDataFrame(lst,['ID','Sensor'])

# define the window specification

w=Window.partitionBy(df['Sensor']).orderBy(df['ID'])

group_df=df.select('*',(df['ID']-row_number().over(w)).alias('grp')).orderBy('ID')
count_df=group_df.groupBy('grp').count()

# get result by joining sequence counts df back to df containing original columns

group_df\
  .join(count_df,count_df['grp']==group_df['grp'])\
  .select('ID','Sensor','count')\
  .filter('Sensor=1')\
  .orderBy('ID')\
  .show()

产生所需的序列长度:

+---+------+-----+                                                              
| ID|Sensor|count|
+---+------+-----+
|  1|     1|    4|
|  2|     1|    4|
|  3|     1|    4|
|  4|     1|    4|
|  8|     1|    3|
|  9|     1|    3|
| 10|     1|    3|
+---+------+-----+
umuewwlo

umuewwlo2#

您描述的解决方法(collect on nodes)可以通过RDD.mapPartitions来实现。这允许您提供一个跨整个分区Map的函数,这意味着您可以在数据子集中的连续样本之间迭代。注意,您需要确保确定分区何时以1开始或结束,并组合分区之间的序列。这可能有点难看,但应该是可能的。
请注意,您需要先对数据进行sort(如果尚未进行)。

相关问题