我正在尝试连接多个Dataframe列,但无法在concat\ws中的when语句上执行pyspark eval或expr。
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws,concat,when,col,expr
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
('a', 'b'))
keys = ['a','b']
key_val = ''
for key in keys:
key_val = key_val + 'when(df["{0}"].isNull(), lit("_")).otherwise(df["{0}"]),'.format(key)
key_val_exp = key_val.rsplit(',', 1)[0]
spaceDeleteUDF = udf(lambda s: str(s).replace(" ", "_").strip(), StringType())
df=df.withColumn("unique_id", spaceDeleteUDF(concat_ws("-",eval(key_val_exp))))
错误:
"TypeError: Invalid argument, not a string or column: (Column<b'CASE WHEN (a IS NULL) THEN _ ELSE a END'>, Column<b'CASE WHEN (b IS NULL) THEN _ ELSE b END'>) of type <class 'tuple'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function."
预期产量:
+----+----+---------+
| a| b|unique_id|
+----+----+---------+
| foo| bar| foo-bar|
|ba z|null| ba_z-_|
+----+----+---------+
1条答案
按热度按时间pvcm50d11#
看看这个。