继续讨论stackoverflow问题,因为它没有正确地解释这个问题:如何对上一个组(月)的同一行执行自联接,以便在pyspark中引入具有不同表达式的附加列(以便更好地解释)
根据下面的输入,我必须根据输出数据最后一列中添加的公式导出输出(如图所示)。这里基本上,我需要根据输入中的值1…n列创建新的opt 1…n列。
输入:
|Month_no|value1 |value2 |
| 01 |10 |20 |
| 01 |20 |30 |
| 02 |30 |40 |
| 02 |40 |50 |
| 03 |50 |60 |
| 03 |60 |70 |
| 04 |70 |80 |
| 04 |80 |90 |
输出:
|Month_no|value1 |value2 |opt1 |opt2 |formula(just for explanation) to calculate opt1 an opt 2
| 01 |10 |20 |10 |20 |same as value 1 or value 2 for 01
| 01 |20 |30 |20 |30 |same as value 1 or value 2 for 01
| 02 |30 |40 |40 |60 |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
| 02 |40 |50 |60 |80 |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
| 03 |50 |60 |90 |120 |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
| 03 |60 |70 |120 |150 |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
| 04 |70 |80 |70 |80 |same as value 1 or value 2 for 04
| 04 |80 |90 |80 |90 |same as value 1 or value 2 for 04
我的代码方式:
从输入Dataframe获取不同monthno#id的列表。
month_list = sorted([row['Month_no'] for row in df.select(df.Month_no).distint().collect()])
使用for循环,迭代month\u list变量
for date in month_list:
if date is in 01 or 04 or 07 or 10:
# Iterating on the value columns and creating the new opt1...n columns as same as value1....n values
else:
Filter the data for the current iteration
df_present = df.filter(df.Month_no == "02")
df_previous = df.filter(df.Month_no == "01") # present month - 1
# doing left join considering the value1...n columns from df_present and opt1...n columns (renamed clmn) from df_previous data to calculate the addition.
df_joined = df_present.join(df_previous, on="key_column", left).select(df_present.columns, df_prevous_renamed_columns.columns)
以同样的方式迭代循环并合并数据,这样前一个月计算的新列将对当前月份的迭代有用。然后返回最终的联合Dataframe。
但是,使用for循环会消耗更多的时间来计算大量记录的数据。请求我们是否可以进入任何其他方法??
1条答案
按热度按时间mqkwyuun1#
您可以使用lag window函数,它允许您访问当前行之前的行:
输出:
请记住,没有分区的窗口可能会导致性能问题。如果您有另一个允许分区的列(例如year),您应该始终使用它。