pyspark 如何在pysprak中运行窗口函数

hgqdbh6s  于 2023-03-28  发布在  Spark
关注(0)|答案(1)|浏览(108)

我有这样的结构。我想根据ENTER12333_NUM的数据分组,如果在该组下我们有任何row_operation作为I/U/D,则将RECALC_ENTER12333_CALCUALTION标记为Y。

+---------+---------+--------------------+-----------------+---------+-----------------+--------------+-----------------------------+-------------+---------------+
|CURR_COL1|CURR_COL2|          HASH_VALUE|        CURR_COL3|CURR_COL4|       CURR_COL45|ENTER12333_NUM|RECALC_ENTER12333_CALCUALTION|ROW_OPERATION|WINNER_IN222222|
+---------+---------+--------------------+-----------------+---------+-----------------+--------------+-----------------------------+-------------+---------------+
|    75757|    hello|9bc7d98d527bb54b1...|    79.0000000000|       pb|    55.0000000000|440.0000000000|                         null|            I|           null|
|       46|    hello|9bc7d98d527bb54b1...|    79.0000000000|       pb|    55.0000000000|590.0000000000|                         null|            I|           null|
|     4545| Senorita|d95ee5d8db9958f6e...|    79.0000000000|     null|    79.0000000000|590.0000000000|                         null|            U|           null|
|   189899|    hello|a93d52dad9dcc3bd0...|    79.0000000000|  Purnima|    79.0000000000|890.0000000000|                         null|            N|           null|
|   234223|  goodbye|4271325117076d7b5...|454646.0000000000|   bhatia|454646.0000000000|890.0000000000|                         null|            D|           null|
+---------+---------+--------------------+-----------------+---------+-----------------+--------------+-----------------------------+-------------+---------------+

我可以写相同的sql查询

select a.*, case when ( sum(case when ROW_OPERATION in ('I','U','D') then 1 else 0 end ) over (partition by ENTER12333_NUM) ) > 0 then 'Y' else 'N' end RECALC_ENTER12333_CALCUALTION
                  from delta_firmographic_data a

我们如何在pyspark dataframe中实现

2ekbmq32

2ekbmq321#

大多数情况下,您可以将sql重写为dataframe,但需要一些实践来识别一些技巧。
请看一下我的例子。我只使用了一行,我并不真正关心数据类型。你的数据框太宽,我不想手工重写。如果你需要更多的测试数据,请用Python提供。
无论如何,这应该能起到作用,或者至少对你来说是一个很好的起点:)

import pyspark.sql.functions as F
from pyspark.sql import Window

df = [
    {
        "CURR_COL1": "75757",
        "CURR_COL2": "hello",
        "HASH_VALUE": "9bc7d98d527bb54b1",
        "CURR_COL3": "79.0000000000",
        "CURR_COL4": "pb",
        "CURR_COL45": "55.0000000000",
        "ENTER12333_NUM": "440.0000000000",
        "RECALC_ENTER12333_CALCUALTION": "null",
        "ROW_OPERATION": "I",
        "WINNER_IN222222": "null",
        "CURR_COL4": "01/01/2022",
        "CURR_COL45": "1",
    },
]

df = spark.createDataFrame(df)

df.withColumn(
    "RECALC_ENTER12333_CALCUALTION",
    F.when(F.sum(
        F.when(
            F.col("ROW_OPERATION").isin(F.lit("I"), F.lit("U"), F.lit("D")), F.lit(1)
        ).otherwise(F.lit(0))
    ).over(Window.partitionBy("ENTER12333_NUM")) > 0, F.lit("Y")).otherwise(F.lit("N")),
).show()

输出:

+---------+---------+-------------+----------+----------+--------------+-----------------+-----------------------------+-------------+---------------+
|CURR_COL1|CURR_COL2|    CURR_COL3| CURR_COL4|CURR_COL45|ENTER12333_NUM|       HASH_VALUE|RECALC_ENTER12333_CALCUALTION|ROW_OPERATION|WINNER_IN222222|
+---------+---------+-------------+----------+----------+--------------+-----------------+-----------------------------+-------------+---------------+
|    75757|    hello|79.0000000000|01/01/2022|         1|440.0000000000|9bc7d98d527bb54b1|                            N|            I|           null|
+---------+---------+-------------+----------+----------+--------------+-----------------+-----------------------------+-------------+---------------+

相关问题