如何在spark scala中使用子查询创建列表达式

vkc1a9a2  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(433)

给定任何df,我想为df计算另一个名为“has\u duplicates”的列,然后添加一个带有布尔值的列来判断每一行是否唯一。输入df示例:

val df = Seq((1, 2), (2, 5), (1, 7), (1, 2), (2, 5)).toDF("A", "B")

给出一个输入 columns: Seq[String] ,我知道如何获得每行的计数:

val countsDf = df.withColumn("count", count("*").over(Window.partitionBy(columns.map(col(_)): _*)))

但我不知道如何使用它为最后一列创建一个列表达式,指示每一行是否唯一。
像这样的

def getEvaluationExpression(df: DataFrame): Column = {
    when("count > 1", lit("fail").otherwise(lit("pass"))
 }

但是需要使用上面的查询当场计算计数。

oxcyiej7

oxcyiej71#

试试下面的代码。

scala> df.withColumn("has_duplicates", when(count("*").over(Window.partitionBy(df.columns.map(col(_)): _*)) > 1 , lit("fail")).otherwise("pass")).show(false)
+---+---+--------------+
|A  |B  |has_duplicates|
+---+---+--------------+
|1  |7  |pass          |
|1  |2  |fail          |
|1  |2  |fail          |
|2  |5  |fail          |
|2  |5  |fail          |
+---+---+--------------+

scala> df.withColumn("count",count("*").over(Window.partitionBy(df.columns.map(col(_)): _*))).withColumn("has_duplicates", when($"count" > 1 , lit("fail")).otherwise("pass")).show(false)
+---+---+-----+--------------+
|A  |B  |count|has_duplicates|
+---+---+-----+--------------+
|1  |7  |1    |pass          |
|1  |2  |2    |fail          |
|1  |2  |2    |fail          |
|2  |5  |2    |fail          |
|2  |5  |2    |fail          |
+---+---+-----+--------------+

相关问题