Pyspark将键/值结构数组转换为单个结构

iszxjhcz  于 2023-11-21  发布在  Apache
关注(0)|答案(1)|浏览(122)

我有一个列,它是一个任意长度的键/值结构数组:

StructType([
    StructField("key", StringType(), False),
    StructType([
        StructField("string_value", StringType(),  True),
        StructField("int_value",    IntegerType(), True),
        StructField("float_value",  FloatType(),   True),
        StructField("double_value", DoubleType(),  True)
    ])
])

字符串
我知道只有几个不同的键名和它们的数据类型。例如,name总是一个字符串,birth_year总是一个整数,等等。不是每个属性都总是存在的,所以预定义的结构必须有所有可空的值,例如:

StructType([
    StructField("first_name",  StringType(),  True),
    StructField("middle_name", StringType(),  True),
    StructField("last_name",   StringType(),  True),
    StructField("birth_year",  IntegerType(), True),
    StructField("ssn",         IntegerType(), True),
    StructField("zipcode",     IntegerType(), True),
])


我的传入列看起来像这样:

[
    (key: "first_name", value: (string_type: "John")),
    (key: "ssn",        value: (int_type:    123456789)),
    (key: "last_name",  value: (string_type: "Doe")),
]
------------------------------------------------------
[
    (key: "ssn",        value: (int_type:    987654321)),
    (key: "last_name",  value: (string_type: "Jones")),
]
------------------------------------------------------
[
    (key: "zipcode",    value: (int_type:    13579)),
    (key: "first_name", value: (string_type: "Bob")),
    (key: "birth_year", value: (int_type:    1985)),
    (key: "last_name",  value: (string_type: "Smith")),
]


我想让它们成为person结构的一列,像这样:

{
    first_name: "John",
    last_name:  "Doe",
    ssn:        123456789
}
------------------------------------------------------
{
    last_name:  "Jones",
    ssn:        987654321
}
------------------------------------------------------
{
    first_name: "Bob",
    last_name:  "Smith",
    birth_year: 1985,
    zipcode:    13579
}


这是一个操场示例,但真实的数据将有几十亿行,因此性能很重要,它不应该使用Python UDF,而应该只使用来自pyspark.sql.functions的东西。

7eumitmz

7eumitmz1#

对于想要的结构体的每个元素,filter可以用来从数组中提取期望值:

from pyspark.sql import functions as F

df = ...input data...

# a list of all possible struct entries in the input data
cfgs = [
    ("first_name", "string_type"),
    ("middle_name", "string_type"),
    ("last_name", "string_type"),
    ("birth_year", "int_type"),
    ("ssn", "int_type"),
    ("zipcode", "int_type")
]

cols = [            # for each element of the cfgs list
                    # take the element of the input array with the correct key
    (F.filter(F.col('person'), lambda c: c['key']==cfg[0])
      [0]           # take the first result (if any)
      ['value']     # take the value struct
      [cfg[1]])     # take the correct element of the the value struct
    .alias(cfg[0])  # rename the column
  for cfg in cfgs]

# combine the columns into a new struct
new_df = df.select(F.struct(cols).alias('person'))

字符串
测试结果:

+------------------------------------------+
|person                                    |
+------------------------------------------+
|{John, null, Doe, null, 123456789, null}  |
|{null, null, Jones, null, 987654321, null}|
|{Bob, null, Smith, 1985, null, 13579}     |
+------------------------------------------+

root
 |-- person: struct (nullable = false)
 |    |-- first_name: string (nullable = true)
 |    |-- middle_name: string (nullable = true)
 |    |-- last_name: string (nullable = true)
 |    |-- birth_year: long (nullable = true)
 |    |-- ssn: long (nullable = true)
 |    |-- zipcode: long (nullable = true)

相关问题