如何对上一个组(月)的同一行执行自联接,以在pyspark中引入其他列

8e2ybdfx  于 2021-06-24  发布在  Hive
关注(0)|答案(1)|浏览(411)

继续讨论stackoverflow问题,因为它没有正确地解释这个问题:如何对上一个组(月)的同一行执行自联接,以便在pyspark中引入具有不同表达式的附加列(以便更好地解释)
根据下面的输入,我必须根据输出数据最后一列中添加的公式导出输出(如图所示)。这里基本上,我需要根据输入中的值1…n列创建新的opt 1…n列。
输入:

|Month_no|value1 |value2 |
|  01    |10     |20     |
|  01    |20     |30     |
|  02    |30     |40     |
|  02    |40     |50     |
|  03    |50     |60     |
|  03    |60     |70     |
|  04    |70     |80     |
|  04    |80     |90     |

输出:

|Month_no|value1 |value2 |opt1   |opt2  |formula(just for explanation) to calculate opt1 an opt 2
|  01    |10     |20     |10     |20    |same as value 1 or value 2 for 01
|  01    |20     |30     |20     |30    |same as value 1 or value 2 for 01
|  02    |30     |40     |40     |60    |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
|  02    |40     |50     |60     |80    |add 02 value 1 or value 2 + previous month 01 opt1 or opt 2 value
|  03    |50     |60     |90     |120   |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
|  03    |60     |70     |120    |150   |add 03 value 1 or value 2 + previous month 02 opt1 or opt 2 value
|  04    |70     |80     |70     |80    |same as value 1 or value 2 for 04
|  04    |80     |90     |80     |90    |same as value 1 or value 2 for 04

我的代码方式:
从输入Dataframe获取不同monthno#id的列表。

month_list = sorted([row['Month_no'] for row in df.select(df.Month_no).distint().collect()])

使用for循环,迭代month\u list变量

for date in month_list:
  if date is in 01 or 04 or 07 or 10:
     # Iterating on the value columns and creating the new opt1...n columns as same as value1....n values
  else:
     Filter the data for the current iteration
     df_present = df.filter(df.Month_no == "02")
     df_previous = df.filter(df.Month_no == "01")  # present month - 1
     # doing left join considering the value1...n columns from df_present and opt1...n columns (renamed clmn) from df_previous data to calculate the addition.
     df_joined = df_present.join(df_previous, on="key_column", left).select(df_present.columns, df_prevous_renamed_columns.columns)

以同样的方式迭代循环并合并数据,这样前一个月计算的新列将对当前月份的迭代有用。然后返回最终的联合Dataframe。
但是,使用for循环会消耗更多的时间来计算大量记录的数据。请求我们是否可以进入任何其他方法??

mqkwyuun

mqkwyuun1#

您可以使用lag window函数,它允许您访问当前行之前的行:

import pyspark.sql.functions as F
from pyspark.sql import Window

l = [( '01'    ,10     ,20     )
,(  '01'   ,20     ,30     )
,(  '02'    ,30     ,40     )
,(  '02'    ,40     ,50     )
,(  '03'    ,50     ,60     )
,(  '03'    ,60     ,70     )
,(  '04'    ,70     ,80     )
,(  '04'    ,80     ,90    )]

df = spark.createDataFrame(l, ['Month_no','value1', 'value2' ])

df = df.withColumn('opt1', df.value1)
df = df.withColumn('opt2', df.value2)    

w = Window.orderBy('value1')

# We use lag(COLUMN, 2) to get the row before the previous row

calculationRequiredMonthsE = ['02', '05', '08', '11']
calculationRequiredMonthsU = ['03', '06', '09', '12']

df = df.withColumn('opt1', F.when(F.col('Month_no').isin(calculationRequiredMonthsE), F.col('opt1') + F.lag('opt1', 2).over(w)).otherwise(F.col('opt1')))
df = df.withColumn('opt2', F.when(F.col('Month_no').isin(calculationRequiredMonthsE), F.col('opt2') + F.lag('opt2', 2).over(w)).otherwise(F.col('opt2')))
df = df.withColumn('opt1', F.when(F.col('Month_no').isin(calculationRequiredMonthsU), F.col('opt1') + F.lag('opt1', 2).over(w)).otherwise(F.col('opt1')))
df = df.withColumn('opt2', F.when(F.col('Month_no').isin(calculationRequiredMonthsU), F.col('opt2') + F.lag('opt2', 2).over(w)).otherwise(F.col('opt2')))

df.show()

输出:

+--------+------+------+----+----+
|Month_no|value1|value2|opt1|opt2|
+--------+------+------+----+----+
|      01|    10|    20|  10|  20|
|      01|    20|    30|  20|  30|
|      02|    30|    40|  40|  60|
|      02|    40|    50|  60|  80|
|      03|    50|    60|  90| 120|
|      03|    60|    70| 120| 150|
|      04|    70|    80|  70|  80|
|      04|    80|    90|  80|  90|
+--------+------+------+----+----+

请记住,没有分区的窗口可能会导致性能问题。如果您有另一个允许分区的列(例如year),您应该始终使用它。

相关问题