%python
def has_column(df, col):
try:
df[col]
return True
except AnalysisException:
return False
df = spark.createDataFrame([ \
("C","I"), \
("I","I"), \
("C","B"), \
], ["B2B","E1J"])
df.show()
+---+---+
|B2B|E1J|
+---+---+
| C| I|
| I| I|
| C| B|
+---+---+
现在我想做的是:检查一个列是否存在并且仅当它存在时,然后检查它的值并在此基础上为标志列指定一个值
df.withColumn("flag",when( ((lit(has_column(df, "B2B"))) & (col("B2B")=="C") ) , 1).otherwise(0)).show()
+---+---+----+
|B2B|E1J|flag|
+---+---+----+
| C| I| 1|
| I| I| 0|
| C| B| 1|
+---+---+----+
我的问题是,这些检查条件不是静态的,而是从外部文件读取并动态生成的,它可能有实际Dataframe没有的列,并导致如下错误。
有什么办法解决这个问题吗?
例如:
df.withColumn("flag", \
when( \
(lit(has_column(df, "GBC"))) & (col("GBC")=="C") | \
(lit(has_column(df, "B2B"))) & (col("B2B")=="C") \
, 1)) \
.otherwise(0).show()
org.apache.spark.sql.AnalysisException: cannot resolve '`GBC`' given input columns: [B2B, E1J];;
1条答案
按热度按时间juud5qan1#
错误是由
col('GBC')
. 可以使用以下代码对可能不存在的列进行预测。