基于另一列值分配id

hlswsv35  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(307)

有没有办法根据state列的值分配操作id?其目的是为每个从开始到结束的序列分配递增的id。例如:在下表中,在2020-09-15 22:49开始的初始操作得到操作id=1,并且直到操作结束的所有行也将得到id 1。每个开始/结束状态以及开始和结束之间的所有“打开”状态将具有相同的id。

Timestamp         |state | operation id
----------------------------------------
2020-09-15 22:53    start   1
2020-09-16 22:53    on      1
2020-09-17 22:53    on      1
2020-09-18 22:53    on      1
2020-09-19 22:53    end     1
2020-09-20 22:53    off     null
2020-09-21 22:53    off     null
2020-09-22 22:53    off     null
2020-09-23 22:53    start   2
2020-09-24 22:53    on      2
2020-09-25 22:53    end     2
2020-09-26 22:53    start   3
2020-09-27 22:53    end     3

时间戳和状态列可用。目的是构建operation id列。

vql8enpb

vql8enpb1#

你可以使用 Window 按“timestamp”排序的函数。既然你想要 operation_id 永远 null 当“state”为“off”时,我会过滤状态为“off”的行,并将其作为单独的Dataframe。我们将“开始”指定为 1 “开”为 0 ,和“结束”为 2 首先,找一个 incremental sum 在这个新的编号上,在您的窗口上指定“state”列。这个 incremental sum 与“结束”状态相对应的总是3的倍数。这也将是你的“结束序列”
为了得到你需要的东西,你必须使用 lag 上的函数 incremental sum 列,然后用滞后值替换3的倍数。最后一步是除以3,将其转换为整数并加1。
现在工会 df_not_off 以及 df_off 对于最终输出
您的Dataframe:

from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import *

schema = StructType([StructField("Timestamp", IntegerType()), StructField("state", StringType())])

data = [[1, 'start'], [2, 'on'], [3, 'on'], [4, 'on'], [5, 'end'], [6, 'off'], [7, 'off'], \
        [8, 'off'], [9, 'start'], [10, 'on'], [11, 'end'], [12, 'start'], [13, 'end']]

df = spark.createDataFrame(data,schema=schema)

df.show()

操作:

df_off = df.filter(col("state")=="off")
df_not_off = df.filter(col("state")!="off")
df_not_off = df_not_off.withColumn("state_num", F.when(col("state")=="start", 1).when(col("state")=="on", 0).otherwise(2))

w=Window().orderBy("Timestamp")

df_not_off = df_not_off.withColumn("incremental_sum", F.sum("state_num").over(w))\
  .withColumn("lag", F.lag("incremental_sum").over(w))\
  .withColumn("incremental_sum", F.when(F.col("incremental_sum")%3==0, F.col("lag")).otherwise(F.col("incremental_sum")))\
  .withColumn("incremental_sum", ((F.col("incremental_sum")/3).cast('integer'))+1)\
  .withColumnRenamed("incremental_sum", "operation_id")\
  .drop("state_num", "lag")

df_off = df_off.withColumn("operation_id", F.lit(None))

final_df = df_not_off.union(df_off)

final_df.orderBy("Timestamp").show()

输出:

+---------+-----+------------+                                                  
|Timestamp|state|operation_id|
+---------+-----+------------+
|        1|start|           1|
|        2|   on|           1|
|        3|   on|           1|
|        4|   on|           1|
|        5|  end|           1|
|        6|  off|        null|
|        7|  off|        null|
|        8|  off|        null|
|        9|start|           2|
|       10|   on|           2|
|       11|  end|           2|
|       12|start|           3|
|       13|  end|           3|
+---------+-----+------------+

相关问题