Pyspark将列转换为结构体数组

oxf4rvwz  于 2023-06-21  发布在  Spark
关注(0)|答案(2)|浏览(106)

我有以下schema:

root
 |-- data: struct (nullable = true)
 |    |-- user_agent_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_value_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_value_DEVICE_METADATA: string (nullable = true)

我需要添加一个带有标记的列作为结构体数组,如下所示:

root
 |-- data: struct (nullable = true)
 |    |-- user_agent_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_0_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_1_value_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_epochTs_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_name_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_timestamp_DEVICE_METADATA: string (nullable = true)
 |    |-- tags_2_value_DEVICE_METADATA: string (nullable = true)
 |-- device_metadata_tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |    |-- epochTs: double (nullable = true)
 |    |    |-- timestamp: double (nullable = true)

但是正如你在标签中看到的,我并不总是有我需要的所有列,例如我没有tags_0_value_DEVICE_METADATA,我不知道标签列表有多长,这里是3,但它可以改变。有办法做到这一点吗?

q9yhzks0

q9yhzks01#

您可以使用columns获取所有列名,然后使用列名对数据进行重新排序:

1.获取列名,提取标签和后缀

df = ...
cols=df.select("data.*").columns
cols.remove("user_agent_DEVICE_METADATA")

import re
tag=set(map(lambda s: re.search('(tags_.*?)_',s).group(1),cols))
suffix=set(map(lambda s: re.search('(tags_.*?)_(.*)',s).group(2),cols))

2.计算结构体的列对象

对于每个标记,构造一个结构。每个结构体由所有可能的后缀组成。如果原始 Dataframe 中不存在标签/后缀组合,则使用null填充。

from pyspark.sql import functions as F

def getStructElements(t):
      for s in suffix:
            col = t + "_" + s
            name = re.search('(.*?)_',s).group(1)
            if col in cols:
                  yield(F.col("data." + col).alias(name))
            else:
                  yield(F.lit(None).alias(name))

def createStructs():
      for t in tag:
            s = list(getStructElements(t))
            yield F.struct(s)

3.选择所需格式的数据

df.select(F.col("data"), F.array(list(createStructs())).alias("device_metadata_tags"))
y3bcpkx1

y3bcpkx12#

如果您的数据列结构良好(命名总是tags_x_y_DEVICE_METADATA),并且输出固定为name, value, epochTs, timestamp
你可以试试这种方法。
首先,将tags_x_收集到一个结构体中,并填充任何缺失的列。

output_cols = ['name', 'value', 'epochTs', 'timestamp']

# For tags_0_***
F.struct(*[F.col(f'data.tags_0_{c}_DEVICE_METADATA').alias(c) 
           if f'tags_0_{c}_DEVICE_METADATA' in df.select('data.*').columns
           else F.lit(None).alias(c)                              
           for c in output_cols])

tags_0/1/2循环此操作。然后,将所有struct收集到一个数组中。
最终密码。

df = (df.select('data', *[F.struct(*[F.col(f'data.tags_{num}_{c}_DEVICE_METADATA').alias(c) 
                                     if f'tags_{num}_{c}_DEVICE_METADATA' in df.select('data.*').columns
                                     else F.lit(None).alias(c)                              
                                     for c in output_cols]
                                  ).alias(f'tags_{num}')
                          for num in range(3)])
      .select('data', F.array(*[F.col(f'tags_{num}') for num in range(3)]).alias('device_metadata_tags')))

相关问题