在Spark Dataframe中的窗口上创建组ID

p3rjfoxz  于 2022-11-25  发布在  Apache
关注(0)|答案(3)|浏览(137)

我有一个 Dataframe ,我想在每个窗口分区中给予id。

id | col |
1  |  a  |
2  |  a  |
3  |  b  |
4  |  c  |
5  |  c  |

所以我想(基于与列col的分组)

id | group |
1  |  1    |
2  |  1    |
3  |  2    |
4  |  3    |
5  |  3    |

我想使用一个窗口函数,但我找不到任何方式来为每个窗口分配一个ID。我需要这样的东西:

w = Window().partitionBy('col')
df = df.withColumn("group", id().over(w))

有什么方法可以实现这样的效果吗?(我不能简单地使用col作为组ID,因为我对在多个列上创建窗口感兴趣)

rjee0c15

rjee0c151#

只需使用dense_rank * 内置函数而不是Window函数 *,即可给予所需的结果

from pyspark.sql import window as W
import pyspark.sql.functions as f
df.select('id', f.dense_rank().over(W.Window.orderBy('col')).alias('group')).show(truncate=False)

给予你就能

+---+-----+
|id |group|
+---+-----+
|1  |1    |
|2  |1    |
|3  |2    |
|4  |3    |
|5  |3    |
+---+-----+
iszxjhcz

iszxjhcz2#

您可以使用原始 Dataframe 为不同的col和自身join分配row_number

val data = Seq(
  (1, "a"),
  (2, "a"),
  (3, "b"),
  (4, "c"),
  (5, "c")
).toDF("id","col")

val df2 = data.select("col").distinct()
  .withColumn("group", row_number().over(Window.orderBy("col")))

val result = data.join(df2, Seq("col"), "left")
    .drop("col")

代码的格式为scala,但可以轻松更改为pyspark
希望这对你有帮助

bfhwhh0e

bfhwhh0e3#

我基于@koiralo编译了这个答案,它允许对多个列进行分组,并决定是否应该删除它们。我使用F.monotonically_increasing_id()来避免OOM问题,缺点是这些数字不会以1为单位增加,而应该被视为随机数。

from typing import List
from pyspark.sql.dataframe import DataFrame

data = [[1, "a","la"], [2, "a","le"], [3, "b","la"],[4,"c","di"],[5,"c","di"]]
df = spark.createDataFrame(data, schema="id LONG, col STRING, other_col STRING")

def n_group(df:DataFrame, groupby: List[str], group_name:str = "group", drop_cols:bool = False)->DataFrame:
    groupby = [groupby] if isinstance(groupby, str) else groupby
    df_distinct = df.select(groupby).distinct()
    df_groups = df_distinct.withColumn(colName = group_name, col = F.monotonically_increasing_id())
    res = df.join(other=df_groups, on=groupby, how="left")
    if drop_cols: res = res.drop(*groupby)
    return res

n_group(df=df, groupby =["col"],group_name="group", drop_cols = False).show()

相关问题