如何根据条件将spark scalaDataframe多行合并为一行

vjrehmav  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(1155)

关闭。这个问题需要细节或清晰。它目前不接受答案。
**想改进这个问题吗?**通过编辑这个帖子来添加细节并澄清问题。

4个月前关门了。
改进这个问题
Dataframe1:

12345,B,C,2020-08-12,Internet
12345,B,D,2002-11-12,Mobile
12345,B,e,2003-10-12,Lap

Dataframe2

12345

我必须连接dataframe1和dataframe2,并在df2的每个记录的输出中为每个记录生成一行。我的输出应该如下所示,
输出:

12345,Y,Y,2002-11-12,Mobile

列的条件,
第1列-不同值
第2列-如果df1的第2列中的所有值=='b',则在输出中填充'y',否则为'n'
第3列-如果df1的第3列中的任何值=='c',则在输出中填充'y',否则为'n'
第4列-从df1的第4列开始取最小日期
第5列-从df1的第5列填充与最小日期对应的值
如何做到这一点?

ql3eal8s

ql3eal8s1#

您可以通过使用groupby-then-aggapi轻松实现它。

import org.apache.spark.sql.functions._
import spark.implictis._
val dataframe2 = dataframe1.groupBy("_c0")
 .agg(when(size(array_distinct(collect_list('_c1))) === lit("1") and array_contains(array_distinct(collect_list('_c1)),'B'),lit("Y")).otherwise(lit("N"))
 ,when(array_contains(collect_list('_c1),'C'),lit("Y")).otherwise(lit("N"))
 ,min('_c3).alias("date"))

然后加入 dataframe2.join(dataframe1,Seq("_c0","dateColumn"),"inner") 并从中选择所有列 dataframe2(select(dataframe2("*"))) 选择“仅” dataframe1(dataframe1("_c4")) 你会得到想要的结果。。。
注意:在连接之前,请确保正确地别名dataframe2和dataframe1列
在groupby和aggregation中,不能访问父df的正常列。
否则:可以将Dataframe转换为rdd,然后将rdd转换为pairedrdd<key,object>,然后可以对成对的rdd执行reducebykey、aggregatebykey或groupbykey操作,并通过编写自定义代码来计算所有聚合。

相关问题