pyspark按列对元素进行分组并创建字典

xpcnnkqh  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(451)

我有一个从csv文件读取的sparkDataframe,如下所示:

df = ss.read \
     .format("csv") \
     .option("delimiter", ";") \
     .option("header", "false") \
     .option("inferSchema", "true") \
     .option("escape", "\"") \
     .option("multiline", "true") \
     .option("wholeFile", "true") \
     .load(file_path)

Dataframe如下所示:

|cod_cli|article_name|rank|
|123    |art_1       |1   |
|123    |art_2       |2   |
|123    |art_3       |3   |
|456    |art_4       |1   |
|456    |art_5       |2   |
|456    |art_6       |3   |

我想按column cod\u cli对元素进行分组,并创建多个列,每个列对应于分组集中的每个产品,作为一个值,一个dictionary key value,其中key作为列名,value作为与该列名相关的值,如下所示:

|cod_cli|Product 1                  |Product 2                  |Product 3                  |
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|

字典值可以是字符串(更好)或Map。我试着这样做:

df = df \
     .groupBy(F.col("cod_cli")) \
     .agg(F.collect_list(F.array("cod_art","rank")))

但是通过这种方式,我创建了一个包含所有分组元素的数组列的列。
有人能帮我吗?
谢谢您
更新
建议的解决方案是:

df = df.withColumn(
            "Product",
            F.to_json(
                F.struct(F.col("cod_art"), F.col("rank"))
            )
        )

通过这种方式,我创建了一个列“product”,其中包含所需的json字符串,例如 {cod_art : art_1, rank : 1} .
然后:

df = df \
     .groupBy(F.col("cod_cli")) \
     .pivot("rank") \
     .agg(F.first("Product"))

通过这种方式,我可以为每个产品创建一个列,按cod\u cli属性分组,并处理将3个以上产品作为列的情况:

|cod_cli|1                          |2                          |3               
|123    |{cod_art : art_1, rank : 1}|{cod_art : art_2, rank : 2}|{cod_art : art_3, rank : 3}|
|456    |{cod_art : art_4, rank : 1}|{cod_art : art_5, rank : 2}|{cod_art : art_6, rank : 3}|
wa7juj8i

wa7juj8i1#

也许这是有用的-

加载提供的数据

val data =
      """
        |cod_cli|article_name|rank
        |123    |art_1       |1
        |123    |art_2       |2
        |123    |art_3       |3
        |456    |art_4       |1
        |456    |art_5       |2
        |456    |art_6       |3
      """.stripMargin
    val stringDS = data.split(System.lineSeparator())
            .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS)

    df.show(false)
    df.printSchema()

    /**
      * +-------+------------+----+
      * |cod_cli|article_name|rank|
      * +-------+------------+----+
      * |123    |art_1       |1   |
      * |123    |art_2       |2   |
      * |123    |art_3       |3   |
      * |456    |art_4       |1   |
      * |456    |art_5       |2   |
      * |456    |art_6       |3   |
      * +-------+------------+----+
      *
      * root
      * |-- cod_cli: integer (nullable = true)
      * |-- article_name: string (nullable = true)
      * |-- rank: integer (nullable = true)
      */

使用pivot和first创建指定的列(应该在pyspark中实现,只需做最小的更改所有的都是pyspark.sql.functions)

df.groupBy("cod_cli")
      .pivot("rank")
      .agg(first("article_name"))
      .select($"cod_cli", $"1".as("Product 1"), $"2".as("Product 2"), $"3".as("Product 3"))
      .withColumn("Product 1", to_json(expr("named_struct('cod_art', `Product 1`, 'rank', '1')")))
      .withColumn("Product 2", to_json(expr("named_struct('cod_art', `Product 2`, 'rank', '2')")))
      .withColumn("Product 3", to_json(expr("named_struct('cod_art', `Product 3`, 'rank', '3')")))
      .show(false)

    /**
      * +-------+------------------------------+------------------------------+------------------------------+
      * |cod_cli|Product 1                     |Product 2                     |Product 3                     |
      * +-------+------------------------------+------------------------------+------------------------------+
      * |123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|
      * |456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|
      * +-------+------------------------------+------------------------------+------------------------------+
      */
1szpjjfi

1szpjjfi2#

你可以不用 pivot (昂贵的操作),使用 collect_liststruct ,那么 to_jsoncreate_map .

from pyspark.sql import functions as F

df\
  .groupBy("cod_cli").agg(F.collect_list(F.struct("article_name","rank"))\
                          .alias("arr"))\
  .select("cod_cli", *(F.to_json(F.create_map(F.lit("cod_art"),(F.col("arr.article_name")[x]),F.lit("rank"),(F.col("arr.rank")[x])))\
                       .alias("Product{}".format(x+1)) for x in range(3)))\
  .show(truncate=False)

# +-------+------------------------------+------------------------------+------------------------------+

# |cod_cli|Product1                      |Product2                      |Product3                      |

# +-------+------------------------------+------------------------------+------------------------------+

# |123    |{"cod_art":"art_1","rank":"1"}|{"cod_art":"art_2","rank":"2"}|{"cod_art":"art_3","rank":"3"}|

# |456    |{"cod_art":"art_4","rank":"1"}|{"cod_art":"art_5","rank":"2"}|{"cod_art":"art_6","rank":"3"}|

# +-------+------------------------------+------------------------------+------------------------------+

相关问题