scala尽可能选择最佳记录

unhi4e5o  于 2021-05-16  发布在  Spark
关注(0)|答案(1)|浏览(340)

在如下目录中有不同的文件
f1.txt文件

id FName Lname Adrress sex levelId

t1 Girish Hm 10oak m 1111 

t2 Kiran Kumar 5wren m 2222

t3 sara chauhan 15nvi f 6666

f2.txt文件

t4 girish hm 11oak m 1111 

t5 Kiran Kumar 5wren f 2222

t6 Prakash Jha 18nvi f 3333

f3.txt文件

t7 Kiran Kumar 5wren f 2222

t8 Girish Hm 10oak m 1111

t9 Prakash Jha 18nvi m 3333

f4.txt文件

t10 Kiran Kumar 5wren f 2222 

t11 girish hm 10oak m 1111

t12 Prakash Jha 18nvi f 3333

只有名字和姓氏常量在这里和大小写应该被忽略,其他地址,性别,levelid可以更改。
数据应首先基于fname和lname进行分组

t1 Girish Hm 10oak m 1111 

t4 girish hm 11oak m 1111 

t8 Girish Hm 10oak m 1111

t11 girish hm 10oak m 1111

t2 Kiran Kumar 5wren m 2222

t5 Kiran Kumar 5wren f 2222

t7 Kiran Kumar 5wren f 2222

t10 Kiran Kumar 5wren f 2222 

t3 sara chauhan 15nvi f 6666

t6 Prakash Jha 18nvi f 3333

t9 Prakash Jha 18nvi m 3333

t12 Prakash Jha 18nvi f 33

稍后,我们需要根据address、sex、levelid列的值的频率从每个组中选择适当的第一条记录
例如:对于女孩般的人

10oak has maximum frequency from address

m has maximum frequency from gender

1111 has maximum frequency from LevelID.

因此,带t1的id将是正确的记录(考虑到需要从组中选择第一个合适的记录)
最终输出应为:

t1 Girish Hm 10oak m 1111

t5 Kiran Kumar 5wren f 2222

t3 sara chauhan 15nvi f 6666

t6 Prakash Jha 18nvi f 3333
6ioyuze2

6ioyuze21#

scala解决方案:
首先定义感兴趣的列:

val cols = Array("Adrress", "sex", "levelId")

然后使用

df.select(
    cols.map(
        x => array(
            count(x).over(
                Window.partitionBy(
                    lower(col("FName")),
                    lower(col("LName")),
                    col(x) 
                )
            ),
            col(x)
        ).alias(x ++ "_freq")
    )
)

然后按每个人分组并聚合以获得最大频率:(忽略伪agg,这是由于agg函数需要一个参数和一堆其他参数)

.groupBy(
    lower(col("FName")).alias("FName"),
    lower(col("LName")).alias("LName"))
.agg(
    count($"*").alias("dummy"),
    cols.map(
        x => max(col(x ++ "_freq"))(1).alias(x)
    ): _*
)
.drop("dummy"))

总体代码:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val cols = Array("Adrress", "sex", "levelId")

val df = spark.read.option("header", "true").option("delimiter", " ").option("inferSchema", "true").csv("names.txt")

val df2 = (df
.select(col("*") +: cols.map(x => array(count(x).over(Window.partitionBy(lower(col("FName")), lower(col("LName")), col(x))), col(x)).alias(x ++ "_freq")): _*)
.groupBy(lower(col("FName")).alias("FName"), lower(col("LName")).alias("LName"))
.agg(count($"*").alias("dummy"), cols.map(x => max(col(x ++ "_freq"))(1).alias(x)): _*)
.drop("dummy"))

df2.show
+-------+-------+-------+---+-------+
|  FName|  LName|Adrress|sex|levelId|
+-------+-------+-------+---+-------+
|   sara|chauhan|  15nvi|  f|   6666|
|prakash|    jha|  18nvi|  f|   3333|
| girish|     hm|  10oak|  m|   1111|
|  kiran|  kumar|  5wren|  f|   2222|
+-------+-------+-------+---+-------+

相关问题