如何在与pyspark中的其他 Dataframe 连接时更新 Dataframe 列值?

0dxa2lsx  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(248)

我有3个 Dataframe df1(EMPLOYEE_INFO),df2(DEPARTMENT_INFO),df3(COMPANY_INFO),我想通过连接所有三个 Dataframe 来更新df1中的一个列。列名为FLAG_DEPARTMENT,它位于df1中。我需要设置FLAG_DEPARTMENT='POLITICS'。在sql中,查询如下所示。

UPDATE [COMPANY_INFO] INNER JOIN ([DEPARTMENT_INFO] 
INNER JOIN [EMPLOYEE_INFO] ON [DEPARTMENT_INFO].DEPT_ID = [EMPLOYEE_INFO].DEPT_ID)
ON [COMPANY_INFO].[COMPANY_DEPT_ID] = [DEPARTMENT_INFO].[DEP_COMPANYID]
SET EMPLOYEE_INFO.FLAG_DEPARTMENT = "POLITICS";

如果这三个表的列中的值匹配,我需要在employee_Info表中设置FLAG_DEPARTMENT='POLITICS'
我怎么能在pyspark中实现同样的事情呢?我刚刚开始学习pyspark,没有那么多深度的知识?

kqlmhetl

kqlmhetl1#

您可以使用join的链,在其顶部有一个select
假设您有下列pyspark DataFrame s:

employee_df
+---------+-------+
|     Name|dept_id|
+---------+-------+
|     John| dept_a|
|      Liù| dept_b|
|     Luke| dept_a|
|  Michail| dept_a|
|      Noe| dept_e|
|Shinchaku| dept_c|
|     Vlad| dept_e|
+---------+-------+

department_df
+-------+----------+------------+
|dept_id|company_id| description|
+-------+----------+------------+
| dept_a|  company1|Department A|
| dept_b|  company2|Department B|
| dept_c|  company5|Department C|
| dept_d|  company3|Department D|
+-------+----------+------------+

company_df
+----------+-----------+
|company_id|description|
+----------+-----------+
|  company1|  Company 1|
|  company2|  Company 2|
|  company3|  Company 3|
|  company4|  Company 4|
+----------+-----------+

然后,您可以执行下列程式码,将flag_department数据行加入employee_df

from pyspark.sql import functions as F

employee_df = (
        employee_df.alias('a')
        .join(
            department_df.alias('b'),
            on='dept_id',
            how='left',
        )
        .join(
            company_df.alias('c'),
            on=F.col('b.company_id') == F.col('c.company_id'),
            how='left',
        )
        .select(
            *[F.col(f'a.{c}') for c in employee_df.columns],
            F.when(
                F.col('b.dept_id').isNotNull() & F.col('c.company_id').isNotNull(),
                F.lit('POLITICS')
            ).alias('flag_department')
        )
    )

新的employee_df将是:

+---------+-------+---------------+
|     Name|dept_id|flag_department|
+---------+-------+---------------+
|     John| dept_a|       POLITICS|
|      Liù| dept_b|       POLITICS|
|     Luke| dept_a|       POLITICS|
|  Michail| dept_a|       POLITICS|
|      Noe| dept_e|           null|
|Shinchaku| dept_c|           null|
|     Vlad| dept_e|           null|
+---------+-------+---------------+

相关问题