如何将'if the do'从SAS转换为Pyspark

nr7wwzry  于 2023-03-07  发布在  Spark
关注(0)|答案(1)|浏览(201)

我有这样一个df:
| 邮编|城市|
| - ------|- ------|
| "河流"|"伦敦"|
| ""|罗马|
| "河流"|柏林|
| "河流"|"马德里"|
| ""|"慕尼黑"|
| ""|"巴黎"|
我在Pyspark中转换的SAS代码

if Zip = '' 
    then do Flud = City;
            Flag = 'City';
         end;
    else do Flud = Zip; 
            Flag = 'Zip';
         end;

所以我期待这样的输出:
| 邮编|城市|流体|旗帜|
| - ------|- ------|- ------|- ------|
| "河流"|"伦敦"|"河流"|"邮编"|
| ""|罗马|罗马|"城市"|
| "河流"|柏林|"河流"|"邮编"|
| "河流"|"马德里"|"河流"|"邮编"|
| ""|"慕尼黑"|"慕尼黑"|"城市"|
| ""|"巴黎"|"巴黎"|"城市"|
我已经转换在Pyspark,但给我错误:
我的代码:

output=df.withColumn('Flag',when((col('Zip').isNull()) & (col('Flag') == 'City'), col('Flud')==col('City'))
         .otherwise(when((col('Zip').isNotNull()) & (col('Flag') == 'Zip'), col('Flud')==col('Zip'))
         .otherwise(col('Zip'))))

Pyspark给我这个错误:

AnalysisException: Column 'Flag' does not exist.

所以我试着先创建变量

df= df.withColumn("Flag", lit(''))

写这个新代码:

output=df.withColumn('Flud',when((col('Zip').isNull()) & (col('Flag') == 'City'), col('Flud')==col('City'))
         .otherwise(when((col('Zip').isNotNull()) & (col('Flag') == 'Zip'), col('Flud')==col('Zip'))
         .otherwise(col('Zip'))))

现在我得到了这个错误:

Column 'Flud' does not exist.

我尝试先创建第二个变量

df= df.withColumn("Flud", lit(''))

写这个新代码:

output=df.withColumn('Flud',when((col('Zip').isNull()) & (col('Flag') == 'City'), col('Flud')==col('City'))
         .otherwise(when((col('Zip').isNotNull()) & (col('Flag') == 'Zip'), col('Flud')==col('Zip'))
         .otherwise(col('Zip'))))

现在我得到了这个错误:

AnalysisException: cannot resolve 'CASE WHEN ((Flud.Zip IS NOT NULL) AND (Flag = 'Zip')) THEN (Flud = Flud.Zip) ELSE Flud.Zip END' due to data type mismatch: THEN and ELSE expressions should all be same type or coercible to a common type, got CASE WHEN ... THEN boolean ELSE string END;

有人能帮我吗?

eyh26e7m

eyh26e7m1#

当在pyspark中使用when函数时,您具有以下结构:
when(EXPRESSION,VALUE IF TRUE).otherwise(VALUE IF FALSE)。根据您的代码,当表达式为True时,您将生成另一个布尔值,比较两列。您需要在此处通知所需的值,并且它们都应具有相同的数据类型。使用示例输出,您可以执行以下操作:

import pyspark.sql.functions as F

zip_is_null = F.col("Zip").isNull()

output = (
    df.withColumn(
        "Flag",
        F.when(zip_is_null, F.lit("City"))
         .otherwise(F.lit("Zip"))
     ).withColumn(
        "Flud",
        F.when(zip_is_null, F.col("City"))
         .otherwise(F.col("Zip"))
     )
)

在您的例子中,您将需要两个CASE WHEN,因为它们将生成两个不同的列FludFlag

相关问题