如何在PySpark中在同一个查询中同时进行连接和更新?

iezvtpos  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(138)

我有一个SQL查询,我试图将其转换为PySpark。在SQL查询中,我们连接两个表,并更新条件匹配的列。SQL查询如下所示:

UPDATE [STUDENT_TABLE] INNER JOIN [COLLEGE_DATA]
ON ([STUDENT_TABLE].UNIQUEID = COLLEGE_DATA.PROFESSIONALID) 
AND ([STUDENT_TABLE].[ADDRESS] = COLLEGE_DATA.STATE_ADDRESS) 
SET STUDENT_TABLE.STUDENTINSTATE = "REGULAR"
WHERE (((STUDENT_TABLE.BLOCKERS) Is Null));
efzxgjgh

efzxgjgh1#

输入示例:

from pyspark.sql import functions as F
df_stud = spark.createDataFrame(
    [(1, 'x', None, 'REG'),
     (2, 'y', 'qwe', 'REG')],
    ['UNIQUEID', 'ADDRESS', 'BLOCKERS', 'STUDENTINSTATE'])

df_college = spark.createDataFrame([(1, 'x'), (2, 'x')], ['PROFESSIONALID', 'STATE_ADDRESS'])

您的查询将只更新df_stud的第一行-列“STUDENTINSTATE”中的值将变为“REGULAR”。
在下面的脚本中,我们先对join执行,然后对df_stud中的所有列执行select,但必须更新的列“STUDENTINSTATE”除外。如果列“PROFESSIONALID”(来自df_college)不为空如果不满足连接条件,则不应更新该值,因此将按原样从列“STUDENTINSTATE”中获取该值。

join_on = (df_stud.UNIQUEID == df_college.PROFESSIONALID) & \
          (df_stud.ADDRESS == df_college.STATE_ADDRESS) & \
          df_stud.BLOCKERS.isNull()
df = (df_stud.alias('a')
    .join(df_college.alias('b'), join_on, 'left')
    .select(
        *[c for c in df_stud.columns if c != 'STUDENTINSTATE'],
        F.expr("nvl2(b.PROFESSIONALID, 'REGULAR', a.STUDENTINSTATE) STUDENTINSTATE")
    )
)

df.show()

# +--------+-------+--------+--------------+

# |UNIQUEID|ADDRESS|BLOCKERS|STUDENTINSTATE|

# +--------+-------+--------+--------------+

# |       1|      x|    null|       REGULAR|

# |       2|      y|     qwe|           REG|

# +--------+-------+--------+--------------+

相关问题