如何在pyspark中获得Map值?

zkure5ic  于 2021-05-29  发布在  Spark
关注(0)|答案(2)|浏览(410)

我有一个Dataframe,我用它来做我所有的计算,它有一个id列和一个name列

id | name
1  | Alex
2  | Bob
3  | Chris
4  | Kevin

我做了一系列的操作,得到了他们最亲密的朋友,这是一个列表对的形式 [id, score] ```
id | friends
1 | [[2, 49], [3, 15]]
2 | [[4, 61], [2, 49], [3, 4]]

如何将此朋友列表Map到姓名列表?现在可以放弃分数了。理想的情况是

id | friends
1 | [Bob, Chris]
2 | [Kevin, Bob, Chris]

我想加入,但我不知道这是怎么回事,因为它是一个列表
nzrxty8p

nzrxty8p1#

看看这个

df = spark.createDataFrame([('1' , 'Alex'), ('2' , 'Bob'), ('3' , 'Chris'), ('4' , 'Kevin')],['id' , 'name'])
df2 = spark.createDataFrame([('1',[[2, 49], [3, 15]]), ('2',[[4, 61], [2, 49], [3, 4]])], ['id' , 'friends'])

df3 = df2.select('id','friends' ,f.expr('''explode(transform(friends,x->x[0])) as friend'''))

df3.join(df,df.id.cast('int')==df3.friend.cast('int')).groupBy(df3.id,df3.friends).agg(f.collect_list('name').alias('friend')).show()

+---+--------------------+-------------------+
| id|             friends|             friend|
+---+--------------------+-------------------+
|  1|  [[2, 49], [3, 15]]|       [Chris, Bob]|
|  2|[[4, 61], [2, 49]...|[Chris, Kevin, Bob]|
+---+--------------------+-------------------+
i2loujxw

i2loujxw2#

也许这很有用(用scala编写)
添加了dimension dataframe表示小数据集和大数据集时的选项

1. 加载两个Dataframe

val data1 =
      """
        |id | name
        |1  | Alex
        |2  | Bob
        |3  | Chris
        |4  | Kevin
      """.stripMargin

    val stringDS1 = data1.split(System.lineSeparator())
      .map(_.split("\\|").map(_.replaceAll("""^[ \t]+|[ \t]+$""", "")).mkString(","))
      .toSeq.toDS()
    val df1 = spark.read
      .option("sep", ",")
      .option("inferSchema", "true")
      .option("header", "true")
      .option("nullValue", "null")
      .csv(stringDS1)
    df1.show(false)
    df1.printSchema()
    /**
      * +---+-----+
      * |id |name |
      * +---+-----+
      * |1  |Alex |
      * |2  |Bob  |
      * |3  |Chris|
      * |4  |Kevin|
      * +---+-----+
      *
      * root
      * |-- id: integer (nullable = true)
      * |-- name: string (nullable = true)
      */

    val df2 =
      spark.sql(
        """
          |select id, friends from values
          | (1, array(named_struct('id', 2, 'score', 49), named_struct('id', 3, 'score', 15))),
          | (2, array(named_struct('id', 4, 'score', 61), named_struct('id', 2, 'score', 49), named_struct('id', 3,
          | 'score', 4)))
          | T(id, friends)
        """.stripMargin)
    df2.show(false)
    df2.printSchema()
    /**
      * +---+--------------------------+
      * |id |friends                   |
      * +---+--------------------------+
      * |1  |[[2, 49], [3, 15]]        |
      * |2  |[[4, 61], [2, 49], [3, 4]]|
      * +---+--------------------------+
      *
      * root
      * |-- id: integer (nullable = false)
      * |-- friends: array (nullable = false)
      * |    |-- element: struct (containsNull = false)
      * |    |    |-- id: integer (nullable = false)
      * |    |    |-- score: integer (nullable = false)
      */

2. 如果维度数据很大

// if df1 has big data
    val exploded = df2.select($"id", explode(expr("friends.id")).as("friend_id"))
      exploded.join(df1, exploded("friend_id")===df1("id"))
      .groupBy(exploded("id"))
      .agg(collect_list($"name").as("friends"))
      .show(false)
    /**
      * +---+-------------------+
      * |id |friends            |
      * +---+-------------------+
      * |2  |[Bob, Chris, Kevin]|
      * |1  |[Bob, Chris]       |
      * +---+-------------------+
      */

3. 如果维度dataframe很小

// if df1 is small
    val b = spark.sparkContext.broadcast(df1.collect().map{case Row(id: Int, name: String) => id -> name}.toMap)

    val getFriendsName = udf((idArray: mutable.WrappedArray[Int]) => idArray.map(b.value(_)))

    df2.withColumn("friends", getFriendsName(expr("friends.id")))
      .show(false)

    /**
      * +---+-------------------+
      * |id |friends            |
      * +---+-------------------+
      * |1  |[Bob, Chris]       |
      * |2  |[Kevin, Bob, Chris]|
      * +---+-------------------+
      */

相关问题