pyspark:将具有多个值的单个列拆分为单独的列

t3psigkw  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(366)

我在数据集中有一列,我需要把它分成多个列。
这是这个专栏的一个例子 contextMap_ID1 这就是我想要的结果。
此代码将创建sample(列 contextMap_ID1 )和结果(除第二列外的其他列)。第二栏解释了我所期望的逻辑。

dfx = sc.parallelize([
              ("blah blah blah createdTimeStamp=2020-08-11 15:31:37.458 blah blah blah","contains the word 'TimeStamp' >> do not process","","","")
             ,(123456789,"NUMERIC 9 digit Number >> caseId",123456789,"","")
             ,("caseId: 2345678 personId: 87654321","Multiple key value pairs >> New Column(s) with key as column Name",2345678,87654321,"")
             ,("CRON","AlphaNumeric without ':'  >> Do not process","","","")
             ,("ABC9876543210","Alpha-NUMERIC starting with 'ABC' >> New Column","","","ABC9876543210")
            ]).toDF(["contextMap_ID1","Description of rules","caseId","personId","ABC"])
dfx.show(truncate=False)
8hhllhi2

8hhllhi21#

你能做的就是 when 基于regex列表的条件来决定如何处理行。然后,根据逻辑,可以提取键值对(列名和值)的列表。
下面代码中的逻辑可能并不完全是您所需要的(虽然它会产生您所期望的输出),但是按照这种方式,您可以轻松地添加或修改条件。
可能是这样的

from pyspark.sql.import functions as F

# So let's first define the conditions and the associated logic

transfo=dict()

# List of pairs

transfo['^([a-zA-Z0-9]+\\s*:\\s*[a-zA-Z0-9]+\\s*)+$'] = F.split(
    F.regexp_replace(F.col('contextMap_ID1'), "\\s*:\\s*", ":"), "\\s+")

# 9 digit number

transfo['^[0-9]{9}$'] = F.array(F.concat_ws(':',
    F.lit("caseId"),
    F.col("contextMap_ID1")))

# Three letters and a number

transfo['^[A-Z]{3}[0-9]+$'] = F.array(F.concat_ws(':', 
    F.regexp_extract(F.col("contextMap_ID1"), '[A-Z]+', 0),
    F.regexp_extract(F.col("contextMap_ID1"), '[0-9]+', 0 )))

# let's combine the conditions into a chain of when/otherwise.

# the initialization of my_fun is meant to avoid discarding rows

# without key value pairs.

my_fun = F.array(F.lit('caseId'))
for x in transfo:
    my_fun = F.when(F.col('contextMap_ID1').rlike(x),
                    transfo[x]).otherwise(my_fun)

一旦我们准备好了主要的转变,我们就可以把一切都结束了。我们使用 my_fun ,提取它们并按键旋转以生成新列。
注意,我们添加了一个id以防 contextMap_ID1 不是唯一的。如果是,您可以删除id。

dfx\
    .select('contextMap_ID1', F.monotonically_increasing_id().alias('id'))\
    .select('contextMap_ID1', 'id', F.explode(my_fun).alias("keyvalue"))\
    .withColumn("key", F.split(F.col('keyvalue'), ":").getItem(0))\
    .withColumn("value", F.split(F.col('keyvalue'), ":").getItem(1))\
    .groupBy("id", "contextMap_ID1")\
    .pivot("key")\
    .agg(F.first(F.col('value')))
    .show(truncate=False)
+-----------+----------------------------------------------------------------------+----------+---------+--------+
|id         |contextMap_ID1                                                        |ABC       |caseId   |personId|
+-----------+----------------------------------------------------------------------+----------+---------+--------+
|34359738368|caseId: 2345678 personId: 87654321                                    |null      |2345678  |87654321|
|25769803776|123456789                                                             |null      |123456789|null    |
|60129542144|ABC9876543210                                                         |9876543210|null     |null    |
|8589934592 |blah blah blah createdTimeStamp=2020-08-11 15:31:37.458 blah blah blah|null      |null     |null    |
|51539607552|CRON                                                                  |null      |null     |null    |
+-----------+----------------------------------------------------------------------+----------+---------+--------+

相关问题