sql—spark中作为group by子句的dataframe的列值

mctunoxg  于 2021-05-27  发布在  Spark
关注(0)|答案(3)|浏览(389)

我有一个数据,看起来像下面提到的表格

+----+----+--------+-------+--------+----------------------+
|User|Shop|Location| Seller|Quantity|         GroupBYClause|
+----+----+--------+-------+--------+----------------------+
|   1| ABC|    Loc1|Seller1|      10|        Shop, location|
|   1| ABC|    Loc1|Seller2|      10|        Shop, location|
|   2| ABC|    Loc1|Seller1|      10|Shop, location, Seller|
|   2| ABC|    Loc1|Seller2|      10|Shop, location, Seller|
|   3| BCD|    Loc1|Seller1|      10|              location|
|   3| BCD|    Loc1|Seller2|      10|              location|
|   3| CDE|    Loc2|Seller3|      10|              location|
+----+----+--------+-------+--------+----------------------+

预期的最终输出是与附加列相同的数据,即sum(数量),其数量之和基于用户提到的聚合
例如,用户1将groupbyclause称为“shop,location”,因此与卖家无关,用户1的总和(数量)是20
类似地,对于用户2,groupbyclause为“shop,location,seller”,因此每行的sum(数量)都是10
期望输出

+------+----+--------+-------+--------+----------------------+-------------+
|UserId|Shop|location| Seller|Quantity|         GroupBYClause|Sum(Quantity)|
+------+----+--------+-------+--------+----------------------+-------------+
|     1| ABC|    Loc1|Seller1|      10|        Shop, location|           20|
|     1| ABC|    Loc1|Seller2|      10|        Shop, location|           20|
|     2| ABC|    Loc1|Seller1|      10|Shop, location, Seller|           10|
|     2| ABC|    Loc1|Seller2|      10|Shop, location, Seller|           10|
|     3| BCD|    Loc1|Seller1|      10|              location|           20|
|     3| BCD|    Loc1|Seller2|      10|              location|           20|
|     3| CDE|    Loc2|Seller3|      10|              location|           10|
+------+----+--------+-------+--------+----------------------+-------------+

我面临的挑战是在spark中将列值用作groupby子句
请帮忙

val df = spark.createDataFrame(Seq(
    (1, "ABC","Loc1","Seller1", 10, "Shop, location"),
    (1, "ABC","Loc1","Seller2", 10, "Shop, location"),
    (2, "ABC","Loc1","Seller1", 10, "Shop, location, Seller"),
    (2, "ABC","Loc1","Seller2", 10, "Shop, location, Seller"),
    (3, "BCD","Loc1","Seller1", 10, "location"),
    (3, "BCD","Loc1","Seller2", 10, "location"),
    (3, "CDE","Loc2","Seller3", 10, "location")
  )).toDF("UserId","Shop", "Location","Seller", "Quantity", "GroupBYClause")
ohfgkhjo

ohfgkhjo1#

试试这个-

加载提供的测试数据

df1.show(false)
    df1.printSchema()
    /**
      * +----+----+--------+-------+--------+----------------------+
      * |User|Shop|Location|Seller |Quantity|GroupBYClause         |
      * +----+----+--------+-------+--------+----------------------+
      * |1   |ABC |Loc1    |Seller1|10      |Shop, location        |
      * |1   |ABC |Loc1    |Seller2|10      |Shop, location        |
      * |2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|
      * |2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |
      * |3   |BCD |Loc1    |Seller1|10      |location              |
      * |3   |BCD |Loc1    |Seller2|10      |location              |
      * |3   |CDE |Loc2    |Seller3|10      |location              |
      * +----+----+--------+-------+--------+----------------------+
      *
      * root
      * |-- User: integer (nullable = true)
      * |-- Shop: string (nullable = true)
      * |-- Location: string (nullable = true)
      * |-- Seller: string (nullable = true)
      * |-- Quantity: integer (nullable = true)
      * |-- GroupBYClause: string (nullable = true)
      */

求和

val isShopLocation = Seq("Shop", "location").map(array_contains($"arr", _)).reduce(_ && _)
    val isShopLocationSeller = Seq("Shop", "location", "Seller").map(array_contains($"arr", _)).reduce(_ && _)
    val isLocation = array_contains($"arr", "location")
    df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
      .withColumn("arr",
        when(isShopLocationSeller, expr("array(Shop, location, Seller)"))
          .when(isShopLocation, expr("array(Shop, location)"))
          .when(isLocation, expr("array(location)"))
      ).withColumn("sum_quantity",
      sum("Quantity").over(Window.partitionBy("User","arr")))
      .show(false)

    /**
      * +----+----+--------+-------+--------+----------------------+--------------------+------------+
      * |User|Shop|Location|Seller |Quantity|GroupBYClause         |arr                 |sum_quantity|
      * +----+----+--------+-------+--------+----------------------+--------------------+------------+
      * |1   |ABC |Loc1    |Seller1|10      |Shop, location        |[ABC, Loc1]         |20          |
      * |1   |ABC |Loc1    |Seller2|10      |Shop, location        |[ABC, Loc1]         |20          |
      * |2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |[ABC, Loc1, Seller2]|10          |
      * |3   |CDE |Loc2    |Seller3|10      |location              |[Loc2]              |10          |
      * |2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|[ABC, Loc1, Seller1]|10          |
      * |3   |BCD |Loc1    |Seller1|10      |location              |[Loc1]              |20          |
      * |3   |BCD |Loc1    |Seller2|10      |location              |[Loc1]              |20          |
      * +----+----+--------+-------+--------+----------------------+--------------------+------------+
      */

动态定义分区

val columns = Seq("Shop", "location", "Seller").flatMap(f => Seq(lit(f), col(f)))
    df1.withColumn("arr", split($"GroupBYClause", "\\s*,\\s*"))
      .withColumn("map1", map(columns: _*))
      .withColumn("arr", expr("TRANSFORM(arr, x -> map1[x])"))
      .withColumn("sum_quantity",
        sum("Quantity").over(Window.partitionBy("User","arr")))
      .show(false)

    /**
      * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
      * |User|Shop|Location|Seller |Quantity|GroupBYClause         |arr                 |map1                                              |sum_quantity|
      * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
      * |1   |ABC |Loc1    |Seller1|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20          |
      * |1   |ABC |Loc1    |Seller2|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20          |
      * |2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10          |
      * |3   |CDE |Loc2    |Seller3|10      |location              |[Loc2]              |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10          |
      * |2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10          |
      * |3   |BCD |Loc1    |Seller1|10      |location              |[Loc1]              |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|20          |
      * |3   |BCD |Loc1    |Seller2|10      |location              |[Loc1]              |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|20          |
      * +----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
      */

编辑-1(基于评论)

// input
+----+----+--------+-------+--------+----------------------+
|User|Shop|Location|Seller |Quantity|GroupBYClause         |
+----+----+--------+-------+--------+----------------------+
|1   |ABC |Loc1    |Seller1|10      |Shop, location        |
|1   |ABC |Loc1    |Seller2|10      |Shop, location        |
|2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|
|2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |
|3   |BCD |Loc1    |Seller1|10      |location,Seller       |
|3   |BCD |Loc1    |Seller2|10      |location              |
|3   |CDE |Loc2    |Seller3|10      |location              |
+----+----+--------+-------+--------+----------------------+

root
 |-- User: integer (nullable = true)
 |-- Shop: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Seller: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- GroupBYClause: string (nullable = true)

// Output
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
|User|Shop|Location|Seller |Quantity|GroupBYClause         |arr                 |map1                                              |sum_quantity|
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
|1   |ABC |Loc1    |Seller1|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller1]|20          |
|1   |ABC |Loc1    |Seller2|10      |Shop, location        |[ABC, Loc1]         |[Shop -> ABC, location -> Loc1, Seller -> Seller2]|20          |
|2   |ABC |Loc1    |Seller2|10      |Shop, location,Seller |[ABC, Loc1, Seller2]|[Shop -> ABC, location -> Loc1, Seller -> Seller2]|10          |
|3   |BCD |Loc1    |Seller1|10      |location,Seller       |[Loc1, Seller1]     |[Shop -> BCD, location -> Loc1, Seller -> Seller1]|10          |
|3   |CDE |Loc2    |Seller3|10      |location              |[Loc2]              |[Shop -> CDE, location -> Loc2, Seller -> Seller3]|10          |
|2   |ABC |Loc1    |Seller1|10      |Shop, location, Seller|[ABC, Loc1, Seller1]|[Shop -> ABC, location -> Loc1, Seller -> Seller1]|10          |
|3   |BCD |Loc1    |Seller2|10      |location              |[Loc1]              |[Shop -> BCD, location -> Loc1, Seller -> Seller2]|10          |
+----+----+--------+-------+--------+----------------------+--------------------+--------------------------------------------------+------------+
6bc51xsx

6bc51xsx2#

可以获取所有不同的“groupbyclause”,并为每个生成窗口,并用“when(..)。否则” Package :

val distinctGroupBY = df.select("GroupBYClause").distinct().as(Encoders.STRING).collect()

val sumQuantityColumn = distinctGroupBY
  .foldLeft(lit(0))(
    (acc, fieldList) =>
      when($"GroupBYClause" === fieldList,
        sum("Quantity").over(Window.partitionBy("UserId", fieldList.split(",").map(_.trim): _ *)))
        .otherwise(acc)
  )

val result = df.withColumn("Sum(Quantity)", sumQuantityColumn)
dldeef67

dldeef673#

立方体函数在这里很有用。看一看:


# Import and test data

import pyspark.sql.functions as F
from pyspark.sql.types import *
tst = spark.createDataFrame([
    (1, "ABC","Loc1","Seller1", 10, "shop,location"),
    (1, "ABC","Loc1","Seller2", 10, "shop,location"),
    (2, "ABC","Loc1","Seller1", 10, "shop,location,seller"),
    (2, "ABC","Loc1","Seller2", 10, "shop,location,seller"),
    (3, "BCD","Loc1","Seller1", 10, "location"),
    (3, "BCD","Loc1","Seller2", 10, "location"),
    (3, "CDE","Loc2","Seller3", 10, "location")
  ]).toDF("UserId","shop", "location","seller", "quantity", "GroupBYClause")

# split the groupby clause columns to use in udf

tst_in = tst.withColumn("grp_arr",F.split('GroupBYClause',','))

# udf to fetch the values of column in groupby clause

@F.udf(ArrayType(StringType()))
def gen_arr(row,group_clause):
    res=[row[x] for x in group_clause]
    return(res)

# The struct can also be constructed for a large column list by list comprehension

tst_in1 = tst_in.withColumn("grp_coln_arr",gen_arr(F.struct(F.col('shop'),F.col('location'),F.col('seller')),F.col('grp_arr')))

# %% perform a cubed aggregation

tst_cub = tst.cube('shop','location','seller').agg(F.sum('quantity').alias('sum_q')).fillna('special_character')

# %% in the cubed aggregation, find the relevant rows

tst_cub1 = tst_cub.withColumn('grp_arr_cub', F.array('shop','location','seller'))
tst_cub2 = tst_cub1.withColumn("grp_arr_cub1",F.array_remove(F.col("grp_arr_cub"),'special_character'))

# %% select the needed quantities using join

tst_res = tst_in1.join(tst_cub2,tst_in1.grp_coln_arr==tst_cub2.grp_arr_cub1,how='left')

结果(我已经包括了所有列供您理解,如果您打算使用它,请确保将它们清理干净)

In [99]: tst_res.show()
+------+----+--------+-------+--------+--------------------+--------------------+--------------------+-----------------+--------+-----------------+-----+--------------------+--------------------+
|UserId|shop|location| seller|quantity|       GroupBYClause|             grp_arr|        grp_coln_arr|             shop|location|           seller|sum_q|         grp_arr_cub|        grp_arr_cub1|
+------+----+--------+-------+--------+--------------------+--------------------+--------------------+-----------------+--------+-----------------+-----+--------------------+--------------------+
|     2| ABC|    Loc1|Seller1|      10|shop,location,seller|[shop, location, ...|[ABC, Loc1, Seller1]|              ABC|    Loc1|          Seller1|   20|[ABC, Loc1, Seller1]|[ABC, Loc1, Seller1]|
|     2| ABC|    Loc1|Seller2|      10|shop,location,seller|[shop, location, ...|[ABC, Loc1, Seller2]|              ABC|    Loc1|          Seller2|   20|[ABC, Loc1, Seller2]|[ABC, Loc1, Seller2]|
|     3| BCD|    Loc1|Seller1|      10|            location|          [location]|              [Loc1]|special_character|    Loc1|special_character|   60|[special_characte...|              [Loc1]|
|     3| BCD|    Loc1|Seller2|      10|            location|          [location]|              [Loc1]|special_character|    Loc1|special_character|   60|[special_characte...|              [Loc1]|
|     3| CDE|    Loc2|Seller3|      10|            location|          [location]|              [Loc2]|special_character|    Loc2|special_character|   10|[special_characte...|              [Loc2]|
|     1| ABC|    Loc1|Seller1|      10|       shop,location|    [shop, location]|         [ABC, Loc1]|              ABC|    Loc1|special_character|   40|[ABC, Loc1, speci...|         [ABC, Loc1]|
|     1| ABC|    Loc1|Seller2|      10|       shop,location|    [shop, location]|         [ABC, Loc1]|              ABC|    Loc1|special_character|   40|[ABC, Loc1, speci...|         [ABC, Loc1]|
+------+----+--------+-------+--------+--------------------+--------------------+--------------------+-----------------+--------+-----------------+-----+--------------------+--------------------+

相关问题