如何分组和合并这些行的SparkDataframe的组

qeeaahzv  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(445)

假设我有一张这样的table,

A  | B  |    C     | D  |  E  | F
x1 | 5  | 20200115 | 15 | 4.5 | 1
x1 | 10 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1

我正在寻找合并列上的这些行 A 生成这样的Dataframe

A  | B  |    C     | D  |  E  | F
x1 | 15 | 20200825 | 15 | 5.6 | 19
x2 | 10 | 20200115 | 15 | 4.1 | 1
x2 | 10 | 20200430 | 15 | 9.1 | 1

基本上,如果a列中b列的和等于d列的值,那么,
b列的新值将是b列的和
c、e、f列将根据c列的最新日期(yyyymmdd中的日期)选取
因为对于组x2,上面的条件不是真的(即b列的和大于d15列),所以我想在目标中保留这两个记录
假设:在我的数据中,给定组的d列将是相同的(在本例中是15)
我已经看了一些分组和窗口(分区)的例子,但在我看来,这是不同的,我无法缩小路线。
我可以将分组的数据通过管道传输到udf并做些什么吗?
ps:在pyspark中构建这个,如果你的示例可以在pyspark中,那就太好了

3xiyfsfu

3xiyfsfu1#

在Pypark中,我会这样做:

from pyspark.sql import functions as F, Window as W

b = ["A", "B", "C", "D", "E", "F"]
a = [
    ("x1", 5, "20200115", 15, 4.5, 1),
    ("x1", 10, "20200825", 15, 5.6, 19),
    ("x2", 10, "20200115", 15, 4.1, 1),
    ("x2", 10, "20200430", 15, 9.1, 1),
]

df = spark.createDataFrame(a, b)

df = df.withColumn("B_sum", F.sum("B").over(W.partitionBy("A")))

process_df = df.where("D >= B_Sum")
no_process_df = df.where("D < B_sum").drop("B_sum")

process_df = (
    process_df.withColumn(
        "rng", F.row_number().over(W.partitionBy("A").orderBy(F.col("C").desc()))
    )
    .where("rng=1")
    .select("A", F.col("B_sum").alias("B"), "C", "D", "E", "F",)
)

final_output = process_df.unionByName(no_process_df)
+---+---+--------+---+---+---+
|  A|  B|       C|  D|  E|  F|
+---+---+--------+---+---+---+
| x1| 15|20200825| 15|5.6| 19|
| x2| 10|20200115| 15|4.1|  1|
| x2| 10|20200430| 15|9.1|  1|
+---+---+--------+---+---+---+
9ceoxa92

9ceoxa922#

试试这个-
使用 sum + max 带开窗功能

df.show(false)
    df.printSchema()
    /**
      * +---+---+--------+---+---+---+
      * |A  |B  |C       |D  |E  |F  |
      * +---+---+--------+---+---+---+
      * |x1 |5  |20200115|15 |4.5|1  |
      * |x1 |10 |20200825|15 |5.6|19 |
      * |x2 |10 |20200115|15 |4.1|1  |
      * |x2 |10 |20200430|15 |9.1|1  |
      * +---+---+--------+---+---+---+
      *
      * root
      * |-- A: string (nullable = true)
      * |-- B: integer (nullable = true)
      * |-- C: integer (nullable = true)
      * |-- D: integer (nullable = true)
      * |-- E: double (nullable = true)
      * |-- F: integer (nullable = true)
      */

    val w = Window.partitionBy("A")
    df.withColumn("sum", sum("B").over(w))
      .withColumn("latestC", max("C").over(w))
      .withColumn("retain",
        when($"sum" === $"D", when($"latestC" === $"C", true).otherwise(false) )
          .otherwise(true) )
      .where($"retain" === true)
      .withColumn("B", when($"sum" === $"D", when($"latestC" === $"C", $"sum").otherwise($"B") )
        .otherwise($"B"))
      .show(false)

    /**
      * +---+---+--------+---+---+---+---+--------+------+
      * |A  |B  |C       |D  |E  |F  |sum|latestC |retain|
      * +---+---+--------+---+---+---+---+--------+------+
      * |x1 |15 |20200825|15 |5.6|19 |15 |20200825|true  |
      * |x2 |10 |20200115|15 |4.1|1  |20 |20200430|true  |
      * |x2 |10 |20200430|15 |9.1|1  |20 |20200430|true  |
      * +---+---+--------+---+---+---+---+--------+------+
      */

相关问题