pyspark eval或expr-使用when语句连接多个Dataframe列

ql3eal8s  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(718)

我正在尝试连接多个Dataframe列,但无法在concat\ws中的when语句上执行pyspark eval或expr。

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import concat_ws,concat,when,col,expr
from pyspark.sql.functions import lit
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
                            ('a', 'b'))
keys = ['a','b']
key_val = ''
for key in keys:
    key_val = key_val + 'when(df["{0}"].isNull(), lit("_")).otherwise(df["{0}"]),'.format(key) 
key_val_exp = key_val.rsplit(',', 1)[0]
spaceDeleteUDF = udf(lambda s: str(s).replace(" ", "_").strip(), StringType())
df=df.withColumn("unique_id", spaceDeleteUDF(concat_ws("-",eval(key_val_exp))))

错误:

"TypeError: Invalid argument, not a string or column: (Column<b'CASE WHEN (a IS NULL) THEN _ ELSE a END'>, Column<b'CASE WHEN (b IS NULL) THEN _ ELSE b END'>) of type <class 'tuple'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function."

预期产量:

+----+----+---------+
|   a|   b|unique_id|
+----+----+---------+
| foo| bar|  foo-bar|
|ba z|null|   ba_z-_|
+----+----+---------+
pvcm50d1

pvcm50d11#

看看这个。

from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql import SparkSession

    spark = SparkSession.builder.getOrCreate()
    df = spark.createDataFrame([("foo", "bar"), ("ba z", None)],
                                ('a', 'b'))

    df.show()

    # +----+----+
    # |   a|   b|
    # +----+----+
    # | foo| bar|
    # |ba z|null|
    # +----+----+

    df1 = df.select( *[F.col(column) for column in df.columns],*[ F.when(F.col(column).isNull(),F.lit('_')).otherwise(F.col(column)).alias(column+'_mod') for column in df.columns])

    df2 = df1.select(*[F.col(column) for column in df1.columns if '_mod' not in column], *[ F.regexp_replace(column, r'\s', '_').alias(column) for column in df1.columns if '_mod' in column])

    df3 = df2.select( *[F.col(column) for column in df1.columns if '_mod' not in column],F.concat_ws('-',*[F.col(column) for column in df2.columns if '_mod' in column]).alias('unique_id'))

    df3.show()

    # +----+----+---------+
    # |   a|   b|unique_id|
    # +----+----+---------+
    # | foo| bar|  foo-bar|
    # |ba z|null|   ba_z-_|
    # +----+----+---------+

相关问题