scala 在Apache Spark中,在groupBy之后聚合Map中的所有列值

n9vozmp4  于 2023-10-18  发布在  Scala
关注(0)|答案(4)|浏览(121)

我已经尝试了一整天,但没有运气到目前为止。已经用RDD做过了,但它不是真正可读的,所以当涉及到代码可读性时,这种方法会更好。
取这个初始DF和结果DF,包括初始DF和执行.groupBy()后我想要获得的DF。

case class SampleRow(name:String, surname:String, age:Int, city:String)
case class ResultRow(name: String, surnamesAndAges: Map[String, (Int, String)])

val df = List(
  SampleRow("Rick", "Fake", 17, "NY"),
  SampleRow("Rick", "Jordan", 18, "NY"),
  SampleRow("Sandy", "Sample", 19, "NY")
).toDF()

val resultDf = List(
  ResultRow("Rick", Map("Fake" -> (17, "NY"), "Jordan" -> (18, "NY"))),
  ResultRow("Sandy", Map("Sample" -> (19, "NY")))
).toDF()

到目前为止,我尝试执行以下.groupBy.

val resultDf = df
  .groupBy(
    Name
  )
  .agg(
    functions.map(
      selectColumn(Surname),
      functions.array(
        selectColumn(Age),
        selectColumn(City)
      )
    )
  )

但是,以下内容会提示进入控制台。

Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`surname`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;

然而,这样做会导致每个姓氏都有一个条目,我想把它们累积在一个Map中,就像你在resultDf中看到的那样。有没有一个简单的方法来实现这一点使用DF?

6za6bjd0

6za6bjd01#

你可以用一个UDF来实现它,将你的数据转换为Map:

val toMap = udf((keys: Seq[String], values1: Seq[String], values2: Seq[String]) => {
    keys.zip(values1.zip(values2)).toMap
  })


   val myResultDF = df.groupBy("name").agg(collect_list("surname") as "surname", collect_list("age") as "age", collect_list("city") as "city").withColumn("surnamesAndAges", toMap($"surname", $"age", $"city")).drop("age", "city", "surname").show(false)
+-----+--------------------------------------+
|name |surnamesAndAges                       |
+-----+--------------------------------------+
|Sandy|[Sample -> [19, NY]]                  |
|Rick |[Fake -> [17, NY], Jordan -> [18, NY]]|
+-----+--------------------------------------+
gev0vcfq

gev0vcfq2#

如果您不关心将Dataframe类型转换为DataSet(在本例中为ResultRow,您可以这样做

val grouped =df.withColumn("surnameAndAge",struct($"surname",$"age"))
.groupBy($"name")
.agg(collect_list("surnameAndAge").alias("surnamesAndAges"))

然后,您可以创建一个用户定义的函数,如下所示:

import org.apache.spark.sql._
val arrayToMap = udf[Map[String, String], Seq[Row]] {
array => array.map { 
case Row(key: String, value: String) => (key, value) }.toMap
}

现在您可以使用.withColumn并将其称为udf

val finalData = grouped.withColumn("surnamesAndAges",arrayToMap($"surnamesAndAges"))

数据框架看起来像这样

finalData: org.apache.spark.sql.DataFrame = [name: string, surnamesAndAges: map<string,string>]
qlckcl4x

qlckcl4x3#

从Spark 2.4开始,您不需要使用Spark用户定义函数:

import org.apache.spark.sql.functions.{col, collect_set, map_from_entries, struct}

df.withColumn("mapEntry", struct(col("surname"), struct(col("age"), col("city"))))
  .groupBy("name")
  .agg(map_from_entries(collect_set("mapEntry")).as("surnameAndAges"))

说明

首先从所需列中添加一个包含Map条目的列。Map条目仅仅是包含两列的struct:第一列是键,第二列是值。您可以将另一个struct作为值。因此,在这里,Map条目将使用列surname作为键,列agecity中的struct作为值:

struct(col("surname"), struct(col("age"), col("city")))

然后,使用函数collect_set收集按groupBy键(列name)分组的所有Map条目,并使用函数map_from_entries将此Map条目列表转换为Map

rkkpypqq

rkkpypqq4#

使用spark-sql & aggregate()函数:
根据给定的数据,模式的结果是

resultDf.printSchema

root
 |-- name: string (nullable = true)
 |-- surnamesAndAges: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: string (nullable = true)

df.createOrReplaceTempView("person")

val dfr = spark.sql("""
with t1 ( select * from person ),
     t2 ( select name, map(surname,struct(age,city)) r1 from t1 ),
     t3 ( select name, collect_list(r1) r2 from t2 group by name )
     select name, aggregate(r2, cast(map() as map<string,struct<age:int,city:string>>), (acc,x) -> map_concat(acc,x) ) r3  from t3 
""")
dfr.show(false)

+-----+--------------------------------------+
|name |r3                                    |
+-----+--------------------------------------+
|Rick |{Fake -> {17, NY}, Jordan -> {18, NY}}|
|Sandy|{Sample -> {19, NY}}                  |
+-----+--------------------------------------+

如果您不知道如何使map()具有上述模式,一种迂回的方法是使用slice()

val dfrp = spark.sql("""
with t1 ( select * from person ),
     t2 ( select name, map(surname,struct(age,city)) r1 from t1 ),
     t3 ( select name, collect_list(r1) r2 from t2 group by name )
select name, aggregate(slice(r2,2,size(r2)), r2[0] , (acc,i) -> map_concat(acc,i)) r3  from t3 """)
dfrp.show(false)

如果你想使用map_from_entries(),那么子表t2中的map应该改为“struct”。下面的查询工作。

val dfp = spark.sql("""
with t1 ( select * from person ),
     t2 ( select name, struct(surname,struct(age,city)) r1 from t1 ),
     t3 ( select name, collect_list(r1) r2 from t2 group by name )
select name, map_from_entries(r2) r3  from t3 
""")

相关问题