在pyspark的这群Pandas有什么选择呢

omqzjyyz  于 2022-11-21  发布在  Spark
关注(0)|答案(1)|浏览(124)

我正在将我的代码转换为pyspark。请找到下面的示例,还有什么方法可以转换为pyspark?我在pyspark中看不到shift。下面代码的主要用途是检查下一行,并在不相等时累加求和。

example['date_next'] = example.groupby("A")['date'].shift(-1).reset_index()
s = example[['A',
    'B', 
    'C',
    'D']].ne(example[['A','B','C','D']].shift(1)).any(axis=1).cumsum()

先谢谢你了

vof42yt1

vof42yt11#

不那么直接。使用数组会简化。请参阅下面的逻辑和代码

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, 'A', 50, 30, 40, 78, 65, None),
     (1, 'B', 56, 30, 30, 25, 67, 75),
    (1, 'C', 56, 30, 30, 25, 67, 75)],
    ['Id', 'Type', 'A', 'B', 'C', 'D', 'E', 'F'])

df.show()

w =Window.partitionBy().orderBy('id')

(df.withColumn('ABCD', array('A','B','C','D'))#Combine the columns into an array
 .withColumn('ABCDShift',lead('ABCD').over(w))#Shiftrows
 .withColumn('cumsum', when(expr("forall(transform(ABCD, (c,i)-> c==ABCDShift[i]),c->c==true)")=='true',0).otherwise(1))#Check if array elements at each index agree totally. If all agree,o, otherwise 1o, if there
 .withColumn('cumsum',sum('cumsum').over(w.rowsBetween(Window.unboundedPreceding,0)))#Cumsum
                           ).drop('ABCDShift','ABCD').show(truncate=False)

|Id |Type|A  |B  |C  |D  |E  |F   |cumsum|
+---+----+---+---+---+---+---+----+------+
|1  |A   |50 |30 |40 |78 |65 |null|1     |
|1  |B   |56 |30 |30 |25 |67 |75  |1     |
|1  |C   |56 |30 |30 |25 |67 |75  |2     |
+---+----+---+---+---+---+---+----+------+

相关问题