我已经尝试了一整天,但没有运气到目前为止。已经用RDD做过了,但它不是真正可读的,所以当涉及到代码可读性时,这种方法会更好。
取这个初始DF和结果DF,包括初始DF和执行.groupBy()
后我想要获得的DF。
case class SampleRow(name:String, surname:String, age:Int, city:String)
case class ResultRow(name: String, surnamesAndAges: Map[String, (Int, String)])
val df = List(
SampleRow("Rick", "Fake", 17, "NY"),
SampleRow("Rick", "Jordan", 18, "NY"),
SampleRow("Sandy", "Sample", 19, "NY")
).toDF()
val resultDf = List(
ResultRow("Rick", Map("Fake" -> (17, "NY"), "Jordan" -> (18, "NY"))),
ResultRow("Sandy", Map("Sample" -> (19, "NY")))
).toDF()
到目前为止,我尝试执行以下.groupBy
.
val resultDf = df
.groupBy(
Name
)
.agg(
functions.map(
selectColumn(Surname),
functions.array(
selectColumn(Age),
selectColumn(City)
)
)
)
但是,以下内容会提示进入控制台。
Exception in thread "main" org.apache.spark.sql.AnalysisException: expression '`surname`' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() (or first_value) if you don't care which value you get.;;
然而,这样做会导致每个姓氏都有一个条目,我想把它们累积在一个Map中,就像你在resultDf
中看到的那样。有没有一个简单的方法来实现这一点使用DF?
4条答案
按热度按时间6za6bjd01#
你可以用一个UDF来实现它,将你的数据转换为Map:
gev0vcfq2#
如果您不关心将Dataframe类型转换为DataSet(在本例中为
ResultRow
,您可以这样做然后,您可以创建一个用户定义的函数,如下所示:
现在您可以使用
.withColumn
并将其称为udf数据框架看起来像这样
qlckcl4x3#
从Spark 2.4开始,您不需要使用Spark用户定义函数:
说明
首先从所需列中添加一个包含Map条目的列。Map条目仅仅是包含两列的
struct
:第一列是键,第二列是值。您可以将另一个struct
作为值。因此,在这里,Map条目将使用列surname
作为键,列age
和city
中的struct
作为值:然后,使用函数
collect_set
收集按groupBy键(列name
)分组的所有Map条目,并使用函数map_from_entries
将此Map条目列表转换为Maprkkpypqq4#
使用spark-sql & aggregate()函数:
根据给定的数据,模式的结果是
如果您不知道如何使map()具有上述模式,一种迂回的方法是使用slice()
如果你想使用map_from_entries(),那么子表t2中的map应该改为“struct”。下面的查询工作。