Pyspark为连续行生成相同的ID

hsvhsicv  于 2023-03-11  发布在  Spark
关注(0)|答案(2)|浏览(153)

我有一个包含user_id和flags的 Dataframe ,我想为同一个user_id中的连续false标记创建单独的组,但我很难让它工作。
Dataframe 如下所示:

+-------+-----+
|user_id| keep|
+-------+-----+
|      1| true|
|      1| true|
|      1|false|
|      1|false|
|      1| true|
|      1| true|
|      2| true|
|      2| true|
|      2|false|
|      2|false|
|      2|false|
|      2| true|
|      2|false|
|      2|false|
|      2| true|
|      3| true|
|      4| true|
|      5| true|
|      5|false|
|      5|false|
+-------+-----+

预期结果:

+-------+-----+-----+
|user_id| flag|group|
+-------+-----+-----+
|      1| true|    0|
|      1| true|    0|
|      1|false|    1|
|      1|false|    1|
|      1| true|    0|
|      1| true|    0|
|      2| true|    0|
|      2| true|    0|
|      2|false|    1|
|      2|false|    1|
|      2|false|    1|
|      2| true|    0|
|      2|false|    2|
|      2|false|    2|
|      2| true|    0|
|      3| true|    0|
|      4| true|    0|
|      5| true|    0|
|      5|false|    1|
|      5|false|    1|
+-------+-----+-----+

你知道我该怎么做吗?

nvbavucw

nvbavucw1#

希望我理解你的问题是正确的。请检查我的答案,看看它是否工作,你的意图

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window

spark = SparkSession.builder.appName("consecutive_false_flags").getOrCreate()

data = [
    (1, True),
    (1, True),
    (1, False),
    (1, False),
    (1, True),
    (1, True),
    (2, True),
    (2, True),
    (2, False),
    (2, False),
    (2, False),
    (2, True),
    (2, False),
    (2, False),
    (2, True),
    (3, True),
    (4, True),
    (5, True),
    (5, False),
    (5, False)
]

df = spark.createDataFrame(data, ["user_id", "keep"])

# Create an ordered unique_id column to make working with window fuction easier
df = df.withColumn("unique_id", monotonically_increasing_id())

# create a column that identifies when a new group starts using lag func
# by checking if it's change from true -> false or false -> true
# if it changes then convert to 1, else 0
# convert all the 'new_group' value to 0 if 'keep' is 'true', only care about 'false' value
df = df.withColumn("new_group",
                   when(col("keep") == "true", 0)
                   .otherwise((lag("keep").over(Window.partitionBy("user_id").orderBy(col("unique_id"))) != col("keep")).
                              cast("int")))
df.show()
# +-------+-----+-----------+---------+
# |user_id| keep|  unique_id|new_group|
# +-------+-----+-----------+---------+
# |      1| true|          0|        0|
# |      1| true|          1|        0|
# |      1|false|          2|        1|
# |      1|false|          3|        0|
# |      1| true|          4|        0|
# |      1| true| 8589934592|        0|
# |      2| true| 8589934593|        0|
# |      2| true| 8589934594|        0|
# |      2|false| 8589934595|        1|
# |      2|false| 8589934596|        0|
# |      2|false|17179869184|        0|
# |      2| true|17179869185|        0|
# |      2|false|17179869186|        1|
# |      2|false|17179869187|        0|
# |      2| true|17179869188|        0|
# |      3| true|25769803776|        0|
# |      4| true|25769803777|        0|
# |      5| true|25769803778|        0|
# |      5|false|25769803779|        1|
# |      5|false|25769803780|        0|
# +-------+-----+-----------+---------+

# add up the new_group values to get the group value
# still convert all the 'group' value to 0 if 'keep' is 'true', only care about 'false' value
df = df.withColumn("group", when(col("keep") == "true", 0)
                   .otherwise(sum("new_group").over(Window.partitionBy("user_id").orderBy(col("unique_id")))))
df.show()
# +-------+-----+-----------+---------+-----+
# |user_id| keep|  unique_id|new_group|group|
# +-------+-----+-----------+---------+-----+
# |      1| true|          0|        0|    0|
# |      1| true|          1|        0|    0|
# |      1|false|          2|        1|    1|
# |      1|false|          3|        0|    1|
# |      1| true|          4|        0|    0|
# |      1| true| 8589934592|        0|    0|
# |      2| true| 8589934593|        0|    0|
# |      2| true| 8589934594|        0|    0|
# |      2|false| 8589934595|        1|    1|
# |      2|false| 8589934596|        0|    1|
# |      2|false|17179869184|        0|    1|
# |      2| true|17179869185|        0|    0|
# |      2|false|17179869186|        1|    2|
# |      2|false|17179869187|        0|    2|
# |      2| true|17179869188|        0|    0|
# |      3| true|25769803776|        0|    0|
# |      4| true|25769803777|        0|    0|
# |      5| true|25769803778|        0|    0|
# |      5|false|25769803779|        1|    1|
# |      5|false|25769803780|        0|    1|
# +-------+-----+-----------+---------+-----+

# drop the 2 spare column to get expected result
df = df.drop("new_group", "unique_id")
df.show()

# +-------+-----+-----+
# |user_id| keep|group|
# +-------+-----+-----+
# |      1| true|    0|
# |      1| true|    0|
# |      1|false|    1|
# |      1|false|    1|
# |      1| true|    0|
# |      1| true|    0|
# |      2| true|    0|
# |      2| true|    0|
# |      2|false|    1|
# |      2|false|    1|
# |      2|false|    1|
# |      2| true|    0|
# |      2|false|    2|
# |      2|false|    2|
# |      2| true|    0|
# |      3| true|    0|
# |      4| true|    0|
# |      5| true|    0|
# |      5|false|    1|
# |      5|false|    1|
# +-------+-----+-----+
wmomyfyw

wmomyfyw2#

from pyspark.sql.functions import row_number, monotonically_increasing_id
from pyspark.sql import Window 

dataframe = dataframe.withColumn( "index", row_number().over(Window.orderBy(monotonically_increasing_id()))) 

w = Window.partitionBy("user_id").orderBy('user_id') 
ww = Window.partitionBy("user_id",'keep').orderBy('row_number') 
dataframe = dataframe.withColumn("row_number", row_number().over(w)).withColumn("rank",rank().over(ww)).withColumn('diff',col('row_number')-col('rank')) 

ww = Window.partitionBy("user_id",'keep').orderBy('diff') 

dataframe = dataframe.withColumn("resu", dense_rank().over(ww)).withColumn('resu', when(col('keep')=='true',0).otherwise(col('resu'))) 

dataframe[["user_id",'keep','resu']].display()

输出:

+-------+-----+-----+
|user_id| keep| resu|
+-------+-----+-----+
|      1| true|    0|
|      1| true|    0|
|      1|false|    1|
|      1|false|    1|
|      1| true|    0|
|      1| true|    0|
|      2| true|    0|
|      2| true|    0|
|      2|false|    1|
|      2|false|    1|
|      2|false|    1|
|      2| true|    0|
|      2|false|    2|
|      2|false|    2|
|      2| true|    0|
|      3| true|    0|
|      4| true|    0|
|      5| true|    0|
|      5|false|    1|
|      5|false|    1|
+-------+-----+-----+

相关问题