我正在尝试使用窗口在我的df中创建一个名为indexcp的新列,如果没有以前的indexcp do 100*(当前的\u df['return']+1),我想从indexcp*(当前的\u df['return']+1)中获取以前的值。
column_list = ["id","secname"]
windowval = (Window.partitionBy(column_list).orderBy(col('calendarday').cast("timestamp").cast("long")).rangeBetween(Window.unboundedPreceding, 0))
spark_df = spark_df.withColumn('indexCP', when(spark_df["PreviousYearUnique"] == spark_df["yearUnique"], 100 * (current_df['return']+1)).otherwise(last('indexCP').over(windowval) * (current_df['return']+1)))
当我运行上述代码时,我得到一个错误“analysisexception:”cannot resolve' indexCP
“给定的输入列:”我认为这意味着你不能接受一个尚未创建的值,但我不确定如何修复它。
Starting Data Frame
## +---+-----------+----------+------------------+
## | id|calendarday| secName| return|
## +---+-----------+----------+------------------+
## | 1|2015-01-01 | 1| 0.0076|
## | 1|2015-01-02 | 1| 0.0026|
## | 1|2015-01-01 | 2| 0.0016|
## | 1|2015-01-02 | 2| 0.0006|
## | 2|2015-01-01 | 3| 0.0012|
## | 2|2015-01-02 | 3| 0.0014|
## +---+----------+-----------+------------------+
New Data Frame IndexCP added
## +---+-----------+--------+---------+------------+
## | id|calendarday| secName| return| IndexCP|
## +---+-----------+--------+---------+------------+
## | 1|2015-01-01 | 1| 0.0076| 100.76|(1st 100*(return+1))
## | 1|2015-01-02 | 1| 0.0026| 101.021976|(2nd 100.76*(return+1))
## | 2|2015-01-01 | 2| 0.0016| 100.16|(1st 100*(return+1))
## | 2|2015-01-02 | 2| 0.0006| 100.220096|(2nd 100.16*(return+1))
## | 3|2015-01-01 | 3| 0.0012| 100.12 |(1st 100*(return+1))
## | 3|2015-01-02 | 3| 0.0014| 100.260168|(2nd 100.12*(return+1))
## +---+----------+---------+---------+------------+
1条答案
按热度按时间zzzyeukh1#
编辑:这应该是最后的答案了,我已经延长了一行
secName
列。你要找的是一个滚动积函数,用你的
IndexCP * (current_return + 1)
. 首先,您需要将所有现有收益汇总到ArrayType
然后聚合。这可以通过一些sparksql来实现aggregate
功能,例如:说明:起始值必须是1,并且100的乘数必须在表达式的外部,否则您确实开始偏离预期收益的100倍。
例如,我已经验证了现在的值是否符合您的公式
secName == 1 and id == 1
:根据公式,这是正确的
(acc, x) -> acc * (1+x)
. 希望这有帮助!