python-3.x 如何在pyspark Dataframe 中根据特定条件分离列

owfi6suc  于 2022-11-26  发布在  Python
关注(0)|答案(1)|浏览(141)

我有一个 Dataframe df,如下所示:

VehNum  Control_circuit control_circuit_status  partnumbers     errors     Flag
4234456 DOC             ok                      A567UR      Software Issue  0
4234456 DOC             not_okay                A568UR      Software Issue  1
4234456 DOC             not_okay                A569UR      Hardware issue  2
4234457 ACR             ok                      A234TY      Hardware issue  0
4234457 ACR             ok                      A235TY      Hardware issue  0
4234457 ACR             ok                      A234TY      Hardware issue  0
4234487 QWR             ok                      A276TY      Hardware issue  0
4234487 QWR             not_okay                A872UR      Hardware issue  1
3423448 QWR             not_okay                A872UR      Hardware issue  1

我想添加一个名为“Control_Flag”的新列,并执行以下操作:对于每个VehNum,Control_circuit,如果其标志值仅为0,则Control_Flag列将保持值0,否则如果其标志值为0,1或2,则Control_Flag列将保持值1
结果如下:

VehNum  Control_circuit control_circuit_status  partnumbers     errors     Flag Control_Flag
4234456 DOC             ok                      A567UR      Software Issue  0   1
4234456 DOC             not_okay                A568UR      Software Issue  1   1
4234456 DOC             not_okay                A569UR      Hardware issue  2   1
4234457 ACR             ok                      A234TY      Hardware issue  0   0
4234457 ACR             ok                      A235TY      Hardware issue  0   0
4234457 ACR             ok                      A234TY      Hardware issue  0   0
4234487 QWR             ok                      A276TY      Hardware issue  0   1
4234487 QWR             not_okay                A872UR      Hardware issue  1   1
3423448 QWR             not_okay                A872UR      Hardware issue  1   1

如何使用pyspark实现这一点?

fcy6dtqo

fcy6dtqo1#

使用带有SUM()的聚合窗口将有助于实现这一点

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import Window

df = spark.createDataFrame(
    [
        ("4234456", "DOC", "ok", "A567UR", "Software Issue", 0),
        ("4234456", "DOC", "not_okay", "A568UR", "Software Issue", 1),
        ("4234456", "DOC", "not_okay", "A569UR", "Hardware Issue", 2),        
        ("4234457", "ACR", "ok", "A234TY", "Hardware Issue", 0),
        ("4234457", "ACR", "ok", "A234TY", "Hardware Issue", 0),
        ("4234457", "ACR", "ok", "A234TY", "Hardware Issue", 0),        
        ("4234487", "QWR", "ok", "A276TY", "Hardware Issue", 0),
        ("4234487", "QWR", "not_okay", "A872UR", "Hardware Issue", 1),
        ("3423448", "QWR", "not_okay", "A872UR", "Hardware Issue", 1),
    ],
    ["VehNum", "Control_circuit", "control_circuit_status", "partnumbers", "errors", "Flag"],
)

df_agg_window = Window.partitionBy(
    "VehNum",
    "Control_circuit",
)

df = (
    df
    .withColumn(
        "flag_sum",
        F.sum("Flag").over(df_agg_window),
    )
    .withColumn(
        "Control_Flag",
        F.when(
            F.lower(F.col("flag_sum")) > 0,
            F.lit(1),
        )
        .otherwise(F.lit(0)),
    )
    #.drop(F.col("flag_sum"))
)

df.show()

输出:

+-------+---------------+----------------------+-----------+--------------+----+--------+------------+
| VehNum|Control_circuit|control_circuit_status|partnumbers|        errors|Flag|flag_sum|Control_Flag|
+-------+---------------+----------------------+-----------+--------------+----+--------+------------+
|4234457|            ACR|                    ok|     A234TY|Hardware Issue|   0|       0|           0|
|4234457|            ACR|                    ok|     A234TY|Hardware Issue|   0|       0|           0|
|4234457|            ACR|                    ok|     A234TY|Hardware Issue|   0|       0|           0|
|4234487|            QWR|              not_okay|     A872UR|Hardware Issue|   1|       1|           1|
|4234487|            QWR|                    ok|     A276TY|Hardware Issue|   0|       1|           1|
|4234456|            DOC|                    ok|     A567UR|Software Issue|   0|       3|           1|
|4234456|            DOC|              not_okay|     A569UR|Hardware Issue|   2|       3|           1|
|4234456|            DOC|              not_okay|     A568UR|Software Issue|   1|       3|           1|
|3423448|            QWR|              not_okay|     A872UR|Hardware Issue|   1|       1|           1|
+-------+---------------+----------------------+-----------+--------------+----+--------+------------+

相关问题