pyspark-如何基于当前和以前的行条件添加新列

plupiseo  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(392)

我有一张这种格式的表格

date           dept            rate

2020-07-06     Marketing.       20 
2020-07-06     Sales.           15
2020-07-06     Engg.            40
2020-07-06     Sites.           18
2020-07-08     Sales.           5
2020-07-08     Engg.            10
2020-07-08     Sites.           7

我想添加新的“spendrate”列,以便在最近两天(示例中为7月7日和8日)将值从7月6日的“rate”复制到“spendrate”。。

date           dept            rate.       Spendrate

2020-07-06     Marketing.       20            20   
2020-07-06     Sales.           15            15
2020-07-06     Engg.            40            40
2020-07-06     Sites.           18            18 
2020-07-07     Marketing.       20.           20
2020-07-08     Sales.           5.            15
2020-07-08     Engg.            10            40
2020-07-08     Sites.           7             18
ct3nt3jp

ct3nt3jp1#

from pyspark.sql import Window, WindowSpec
import pyspark.sql.functions as F
import pandas as pd

# Create the test data Assuming everyday, we have data for all departments

date = ['2020-07-06', '2020-07-06', '2020-07-06', '2020-07-06','2020-07-07','2020-07-07','2020-07-07','2020-07-07', '2020-07-08', '2020-07-08','2020-07-08','2020-07-08' ]
dept = ['Marketing', 'Sales', 'Engg', 'Sites','Marketing', 'Sales', 'Engg', 'Sites','Marketing', 'Sales', 'Engg', 'Sites',]
rate = [20,15,40,18,20, 3, 6, 9, 100,5,10,7]
df = pd.DataFrame([date, dept, rate]).T
df.columns = ['date', 'dept', 'rate']

# create spark DtaFrame

sdf = spark.createDataFrame(df)

sdf.show()

+----------+---------+----+
|      date|     dept|rate|
+----------+---------+----+
|2020-07-06|Marketing|  20|
|2020-07-06|    Sales|  15|
|2020-07-06|     Engg|  40|
|2020-07-06|    Sites|  18|
|2020-07-07|Marketing|  20|
|2020-07-07|    Sales|   3|
|2020-07-07|     Engg|   6|
|2020-07-07|    Sites|   9|
|2020-07-08|Marketing| 100|
|2020-07-08|    Sales|   5|
|2020-07-08|     Engg|  10|
|2020-07-08|    Sites|   7|
+----------+---------+----+

# Lag returns by one day

windowSpec = Window.partitionBy('dept').orderBy('date')
value_column = 'rate_shift'
value_ff = F.lag(sdf['rate'], offset=2).over(windowSpec)
sdf = sdf.withColumn(value_column, value_ff)

# returns = returns.withColumn(value_column, value_ff)

sdf.orderBy('date').show()

+----------+---------+----+----------+
|      date|     dept|rate|rate_shift|
+----------+---------+----+----------+
|2020-07-06|Marketing|  20|      null|
|2020-07-06|    Sales|  15|      null|
|2020-07-06|     Engg|  40|      null|
|2020-07-06|    Sites|  18|      null|
|2020-07-07|Marketing|  20|      null|
|2020-07-07|     Engg|   6|      null|
|2020-07-07|    Sales|   3|      null|
|2020-07-07|    Sites|   9|      null|
|2020-07-08|    Sales|   5|        15|
|2020-07-08|     Engg|  10|        40|
|2020-07-08|Marketing| 100|        20|
|2020-07-08|    Sites|   7|        18|
+----------+---------+----+----------+

rates are shiifted by 2 days
mwecs4sa

mwecs4sa2#

使用窗口 first(col,ignoreNulls=True) 用rangebetween子句生成一个框架。 Example: ```
df.show()

+----------+---------+----+

| date| dept|rate|

+----------+---------+----+

|2020-07-06|Marketing| 20|

|2020-07-06| Sales| 15|

|2020-07-06| Engg| 40|

|2020-07-06| sites| 18|

|2020-07-08| Sales| 5|

|2020-07-08| Engg| 10|

|2020-07-08| sites| 7|

|2020-07-07|Marketing| 20|

+----------+---------+----+

sql("select *, first(rate,True) over(partition by dept order by cast (date as timestamp) RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND CURRENT ROW) as Spendrate from tmp order by date").show()

for more specific range by checking datediff -1 or 0 then generating Spendrate column.

sql("select date,dept,rate,case when diff=-1 then first(rate,True) over(partition by dept order by cast (date as timestamp) RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND CURRENT ROW) when diff=0 then first(rate,True) over(partition by dept order by cast (date as timestamp) RANGE BETWEEN INTERVAL 2 DAYS PRECEDING AND CURRENT ROW) else rate end as Spendrate from (select *,datediff(date,current_date)diff from tmp)t order by date").show()

+----------+---------+----+----------+

| date| dept|rate| Spendrate|

+----------+---------+----+----------+

|2020-07-06|Marketing| 20| 20 |

|2020-07-06| sites| 18| 18 |

|2020-07-06| Engg| 40| 40 |

|2020-07-06| Sales| 15| 15 |

|2020-07-07|Marketing| 20| 20 |

|2020-07-08| sites| 7| 18 |

|2020-07-08| Engg| 10| 40 |

|2020-07-08| Sales| 5| 15 |

+----------+---------+----+----------+

相关问题