在pyspark/hive中使用条件运行total

67up9zun  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(354)

我有产品,品牌和百分比栏。我要计算当前行上方的行的百分比列的总和,对于那些与当前行具有不同品牌的行,以及那些与当前行具有相同品牌的行。如何在pyspark或使用spark.sql中执行此操作?
样本数据:

df = pd.DataFrame({'a': ['a1','a2','a3','a4','a5','a6'],
              'brand':['b1','b2','b1', 'b3', 'b2','b1'],
          'pct': [40, 30, 10, 8,7,5]})
df = spark.createDataFrame(df)

我要找的是:

product  brand  pct  pct_same_brand  pct_different_brand
a1       b1     40     null           null
a2       b2     30     null           40
a3       b1     10     40             30
a4       b3     8      null           80
a5       b2     7      30             58
a6       b1     5      50             45

这就是我尝试过的:

df.createOrReplaceTempView('tmp')
spark.sql("""
select *, sum(pct * (select case when n1.brand = n2.brand then 1 else 0 end 
from tmp n1)) over(order by pct desc rows between UNBOUNDED PRECEDING and 1 
preceding) 
from tmp n2
""").show()
j91ykkif

j91ykkif1#

你可以得到 pct_different_brand 列,从分区滚动和中减去总滚动和(即。 pct_same_brand 列):

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'pct_same_brand', 
    F.sum('pct').over(
        Window.partitionBy('brand')
              .orderBy(F.desc('pct'))
              .rowsBetween(Window.unboundedPreceding, -1)
    )
).withColumn(
    'pct_different_brand', 
    F.sum('pct').over(
        Window.orderBy(F.desc('pct'))
              .rowsBetween(Window.unboundedPreceding, -1)
    ) - F.coalesce(F.col('pct_same_brand'), F.lit(0))
)

df2.show()

+---+-----+---+--------------+-------------------+
|  a|brand|pct|pct_same_brand|pct_different_brand|
+---+-----+---+--------------+-------------------+
| a1|   b1| 40|          null|               null|
| a2|   b2| 30|          null|                 40|
| a3|   b1| 10|            40|                 30|
| a4|   b3|  8|          null|                 80|
| a5|   b2|  7|            30|                 58|
| a6|   b1|  5|            50|                 45|
+---+-----+---+--------------+-------------------+

相关问题