我正在将代码从SAS迁移到PySpark,我正在努力解决以下SAS retain语句:
data ds;
set ds;
by group date;
retain Target;
if first.group then Target = Orig;
if first.group and ( Orig in (1,2,3,4,5) ) then Target = 6;
if not first.group and Target = 6 and (Orig in (1,2,3,4,5) ) then Target = 6 ;
if not first.group and ~(Target = 6 and (Orig in (1,2,3,4,5) ) ) then Target = Orig ;
run;
如何才能做到这一点?
如果第一个原始值不为0,则向上舍入为6。如果不是组中的第一个,并且目标已经为6,并且原始值在1、2、3、4、5中,则将目标保持为6。如果不是组中的第一个,并且目标不是6或原始值不在1、2、3、4、5中,则将目标设置为原始值。
我为一个单独的组提供了一个例子(为冗长的一个道歉):
df = SparkSession.createDataFrame([
(999, 5,6) ,
(999, 6,6) ,
(999, 4,6) ,
(999, 6,6) ,
(999, 3,6) ,
(999, 5,6) ,
(999, 4,6) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 5,6) ,
(999, 3,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 2,6) ,
(999, 1,6) ,
(999, 0,0) ,
(999, 0,0) ,
(999, 0,0) ,
(999, 0,0) ,
(999, 1,1) ,
(999, 1,1) ,
(999, 2,2) ,
(999, 2,2) ,
(999, 3,3) ,
(999, 2,2) ,
(999, 3,3) ,
(999, 4,4) ,
(999, 5,5) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 0,0) ,
(999, 1,1) ,
(999, 0,0) ,
(999, 1,1) ,
(999, 2,2) ,
(999, 3,3) ,
(999, 4,4) ,
(999, 5,5) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 6,6) ,
(999, 4,6) ,
(999, 3,6) ,
(999, 2,6) ,
(999, 3,6) ,
(999, 4,6) ,
(999, 5,6) ,
(999, 6,6) ,
(999, 6,6) ],
['Group', 'Orig', 'Target']
)
1条答案
按热度按时间sd2nnvve1#
没有简单的方法来复制SAS的
retain
。但是可以建立在逻辑上。retain
,在某种程度上,正在查看以前计算的值来计算当前值。* 所以,你本质上是滞后于你正在创建的列,而你正在创建它。*pyspark可以使用结构体数组和高阶函数来实现。
下面是实现方法 (注意,我添加了一个日期字段-
dt
-这有助于数据排序-类似于您的by group date;
语句)解释
aggregate
高阶函数接受一个数组,初始值和一个要合并的函数(类似于python的reduce
)。target
(这是first.group
计算)。not first.group
条件的地方y
是当前正在计算的值,它查看先前计算的值,而先前计算的值又是来自x
(element_at(x, -1)
)的最后一个值exp_tgt
(预期目标)是问题中共享的示例数据中已有的target
字段。tgt
是pyspark生成的最终目标字段。*对于那些由于旧版本的spark而无法使用
aggregate
函数的人,他们可以使用expr()
中的aggregate
SQL函数,如下所示。