将所有行与组pyspark Dataframe 进行比较

41ik7eoe  于 2023-05-07  发布在  Apache
关注(0)|答案(2)|浏览(260)

我有如下的数据框,我需要比较基于company的组中列first_nmsur_nm的行值。基于匹配,我将在输出中为status列赋值。

+--------+--------+----------------+--------------+
| company|      id|        first_nm|        sur_nm|
+--------+--------+----------------+--------------+
|SYNTHE01|SYNTHE02|           JAMES|        FOWLER|
|SYNTHE01|SYNTHE03|          MONICA|        FOWLER|
|SYNTHE01|SYNTHE04|          GEORGE|        FOWLER|
|SYNTHE08|SYNTHE05|           JAMES|        FIWLER|
|SYNTHE08|SYNTHE06|           JAMES|        FUWLER|
|SYNTHE08|SYNTHE07|           JAMES|        FAWLER|
|SYNTHE08|SYNTHE08|           JAMES|        FEWLER|
|SYNTHE11|SYNTHE12|           JAMES|        FOWLER|
|SYNTHE11|SYNTHE11|           JAMES|        FOWLER|
|SYNTHE09|SYNTHE0X|            Null|          Null|
|SYNTHE09|SYNTHE0Y|            Null|          Null|
|SYNTHE09|SYNTHE0Z|            Null|          Null|
+--------+--------+----------------+--------------+

对于例如。
如果所有行的first_nmsur_nm都匹配,则特定的company-status为0。
如果company组中只有first_nm匹配-status为1。
如果在company组中只有sur_nm匹配,则status为2。
如果没有匹配项或值为空-status为99。
输出 Dataframe 如下:

+--------+--------+----------------+--------------+-------+
| company|      id|        first_nm|        sur_nm| status|
+--------+--------+----------------+--------------+-------+
|SYNTHE01|SYNTHE02|           JAMES|        FOWLER|      2|
|SYNTHE01|SYNTHE03|          MONICA|        FOWLER|      2|
|SYNTHE01|SYNTHE04|          GEORGE|        FOWLER|      2|
|SYNTHE08|SYNTHE05|           JAMES|        FIWLER|      1|
|SYNTHE08|SYNTHE06|           JAMES|        FUWLER|      1|
|SYNTHE08|SYNTHE07|           JAMES|        FAWLER|      1|
|SYNTHE08|SYNTHE08|           JAMES|        FEWLER|      1|
|SYNTHE11|SYNTHE12|           JAMES|        FOWLER|      0|
|SYNTHE11|SYNTHE11|           JAMES|        FOWLER|      0|
|SYNTHE09|SYNTHE0X|            Null|          Null|     99|
|SYNTHE09|SYNTHE0Y|            Null|          Null|     99|
|SYNTHE09|SYNTHE0Z|            Null|          Null|     99|
+--------+--------+----------------+--------------+-------+

我们如何在一个列中处理不同行值的这种比较。请引导。
谢谢你

vsnjm48y

vsnjm48y1#

您的DataFrame(df):

+--------+--------+--------+------+
| company|      id|first_nm|sur_nm|
+--------+--------+--------+------+
|SYNTHE01|SYNTHE02|   JAMES|FOWLER|
|SYNTHE01|SYNTHE03|  MONICA|FOWLER|
|SYNTHE01|SYNTHE04|  GEORGE|FOWLER|
|SYNTHE08|SYNTHE05|   JAMES|FIWLER|
|SYNTHE08|SYNTHE06|   JAMES|FUWLER|
|SYNTHE08|SYNTHE07|   JAMES|FAWLER|
|SYNTHE08|SYNTHE08|   JAMES|FEWLER|
|SYNTHE11|SYNTHE12|   JAMES|FOWLER|
|SYNTHE11|SYNTHE11|   JAMES|FOWLER|
|SYNTHE09|SYNTHE0X|    null|  null|
|SYNTHE09|SYNTHE0Y|    null|  null|
|SYNTHE09|SYNTHE0Z|    null|  null|
+--------+--------+--------+------+

1.导入必要的包

from pyspark.sql.functions import col, when, size, collect_set

1.获取first_nmsur_nm的唯一计数

unique_df = df.groupBy("company").agg(
    size(collect_set("first_nm")).alias("first_nm_size"),
    size(collect_set("sur_nm")).alias("sur_nm_size")
)

1.应用条件

company_status_df = unique_df.withColumn("status", 
                   when((col("first_nm_size") == 1) & (col("sur_nm_size") == 1), 0)
                   .when(col("first_nm_size") == 1, 1)
                   .when(col("sur_nm_size") == 1, 2)
                   .otherwise(99)
              ).select("company", "status")

1.将其与原始DataFrame df连接

df.join(company_status_df, "company").show()

输出:

+--------+--------+--------+------+------+
| company|      id|first_nm|sur_nm|status|
+--------+--------+--------+------+------+
|SYNTHE01|SYNTHE02|   JAMES|FOWLER|     2|
|SYNTHE01|SYNTHE03|  MONICA|FOWLER|     2|
|SYNTHE01|SYNTHE04|  GEORGE|FOWLER|     2|
|SYNTHE08|SYNTHE05|   JAMES|FIWLER|     1|
|SYNTHE08|SYNTHE06|   JAMES|FUWLER|     1|
|SYNTHE08|SYNTHE07|   JAMES|FAWLER|     1|
|SYNTHE08|SYNTHE08|   JAMES|FEWLER|     1|
|SYNTHE11|SYNTHE12|   JAMES|FOWLER|     0|
|SYNTHE11|SYNTHE11|   JAMES|FOWLER|     0|
|SYNTHE09|SYNTHE0X|    null|  null|    99|
|SYNTHE09|SYNTHE0Y|    null|  null|    99|
|SYNTHE09|SYNTHE0Z|    null|  null|    99|
+--------+--------+--------+------+------+
x8diyxa7

x8diyxa72#

通过分组company列上的多个when条件(到分组中的count distinct值):

import pyspark.sql.functions as F

df = df.join(df.groupby('company')
             .agg(F.when((F.countDistinct('first_nm') == 1) & (F.countDistinct('sur_nm') == 1), 0)
                   .when(F.countDistinct('first_nm') == 1, 1)
                   .when(F.countDistinct('sur_nm') == 1, 2).otherwise(99)
                   .alias('status')), on='company')

df.show(truncate=False)
+--------+--------+--------+------+------+
|company |id      |first_nm|sur_nm|status|
+--------+--------+--------+------+------+
|SYNTHE01|SYNTHE02|JAMES   |FOWLER|2     |
|SYNTHE01|SYNTHE03|MONICA  |FOWLER|2     |
|SYNTHE01|SYNTHE04|GEORGE  |FOWLER|2     |
|SYNTHE08|SYNTHE05|JAMES   |FIWLER|1     |
|SYNTHE08|SYNTHE06|JAMES   |FUWLER|1     |
|SYNTHE08|SYNTHE07|JAMES   |FAWLER|1     |
|SYNTHE08|SYNTHE08|JAMES   |FEWLER|1     |
|SYNTHE11|SYNTHE12|JAMES   |FOWLER|0     |
|SYNTHE11|SYNTHE11|JAMES   |FOWLER|0     |
|SYNTHE09|SYNTHE0X|null    |null  |99    |
|SYNTHE09|SYNTHE0Y|null    |null  |99    |
|SYNTHE09|SYNTHE0Z|null    |null  |99    |
+--------+--------+--------+------+------+

相关问题