scala spark groupby/agg函数

62lalag4  于 2021-05-29  发布在  Spark
关注(0)|答案(1)|浏览(984)

我有两个数据集,我需要加入和执行操作,我不知道怎么做。对此的一个规定是,我没有org.apache.spark.sql.functions方法可用,因此必须使用dataset api
给定的输入是两个数据集,第一个数据集是customer类型,包含字段:customerid、forename、surname-all string
第二个数据集是事务的:customerid(string)、accountid(string)、amount(long)
customerid是链接
输出的数据集需要有以下字段:customerid(string)、forename(string)、surname(string)、transactions(类型transaction的列表)、transactioncount(int)、totaltransactionmount(double)、averagetransactionmount(double)
我知道我需要使用groupby,agg,以及最后的一些连接。有人能帮我指出正确的方向吗?谢谢

3htmauhk

3htmauhk1#

使用您拥有的信息是非常困难的,但是据我所知,您不想使用dataframe函数,而是用datasetapi实现所有功能,您可以通过以下方式来实现
使用joinwith连接这两个数据集,您可以在这里找到一个示例https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-joins.html#joinwith
聚合:我会使用 groupByKey 然后 mapGroups 像这样的

ds.groupByKey(x=>x.id).mapGroups { case (key,iter) => { 
        val list = iter.toList
        val totalTransactionAmount = ???
        val averageTransactionAmount = ??? 
        (key,totalTransactionAmount,averageTransactionAmount)
   } 
 }

希望这个示例能让您了解如何使用dataset api解决问题,并使其适应您的问题。

相关问题