追加一个单调递增的id列,该列在列值匹配时递增

insrf1ej  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(378)

我正在接收一个Dataframe,我想附加一个单调递增的列,每当另一列与某个值匹配时,该列就会递增。例如,我有下表

+------+-------+
| Col1 | Col2  |
+------+-------+
| B    |   543 |
| A    |  1231 |
| B    | 14234 |
| B    | 34234 |
| B    |  3434 |
| A    | 43242 |
| B    | 43242 |
| B    | 56453 |
+------+-------+

我想附加一个列,每当col1中出现“a”时,它的值就会增加。所以结果看起来

+------+-------+------+
| Col1 | Col2  | Col3 |
+------+-------+------+
| B    |   543 |    0 |
| A    |  1231 |    1 |
| B    | 14234 |    1 |
| B    | 34234 |    1 |
| B    |  3434 |    1 |
| A    | 43242 |    2 |
| B    | 43242 |    2 |
| B    | 56453 |    2 |
+------+-------+------+

保持最初的秩序很重要。
我试过拉拉链,但似乎没有产生正确的结果。手动将其拆分为单独的seq并以这种方式执行是不够的(想想100+gb的表)。我尝试了一个map函数,它可以在某个地方保留一个计数器,但无法让它工作。
任何正确方向的建议或指针都将不胜感激。

3xiyfsfu

3xiyfsfu1#

使用 windowCol1 = A .

import pyspark.sql.functions as f
from pyspark.sql import Window

w = Window.partitionBy().rowsBetween(Window.unboundedPreceding, Window.currentRow)

df.withColumn('Col3', f.sum(f.when(f.col('Col1') == f.lit('A'), 1).otherwise(0)).over(w)).show()

+----+-----+----+
|Col1| Col2|Col3|
+----+-----+----+
|   B|  543|   0|
|   A| 1231|   1|
|   B|14234|   1|
|   B|34234|   1|
|   B| 3434|   1|
|   A|43242|   2|
|   B|43242|   2|
|   B|56453|   2|
+----+-----+----+
2ic8powd

2ic8powd2#

spark不提供任何默认函数来实现这种功能
我很可能会这样做

//inputDF contains Col1 | Col2 
val df = inputDF.select("Col1").distinct.rdd.zipWithIndex().toDF("Col1","Col2")
val finalDF = inputDF.join(df,df("Col1") === inputDF("Col1"),"left").select(inputDF("*"),"Col3")

但我看到的问题是(join将导致洗牌)。
您也可以在这里检查其他自动增量api。

相关问题