Apache Spark 将结构的结构转换为结构的数组,将结构字段名称拉入其中

zour9fqk  于 2022-11-16  发布在  Apache
关注(0)|答案(2)|浏览(225)

我正在尝试转换以下架构:

|-- a: struct (nullable = true)
 |    |-- b: struct (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)
 |    |-- c: struct (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)

变成这样:

|-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- struct_key: string (nullable = true)
 |    |    |-- one: double (nullable = true)
 |    |    |-- two: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- three: string (nullable = true)
 |    |    |-- four: boolean (nullable = true)

实际上只是尝试获取结构键,并将其转换为字符串,然后将其添加到列中。数据集中的b/c结构体很多,因此需要一些通配符来转换它们。
我用的是Spark 3.2.1。
数据是从JSON生成的,因此读取如下:

df = spark.read.json(json_file)
4c8rllxm

4c8rllxm1#

我更喜欢这种方法,因为我们经常需要保留原始 Dataframe 中存在的其他列。

dtype = df.schema['a'].dataType[0].dataType.simpleString()
df = df.withColumn('a', F.map_values(F.transform_values(
    F.from_json(F.to_json("a"), f'map<string,{dtype}>'), 
    lambda k, v: v.withField('struct_key', k)
)))

df.schema['a'].dataType[0].dataType.simpleString()导航到内部结构并以DDL格式提取其架构。
F.from_json(F.to_json("a"), f'map<string,{dtype}>')将内部的 struct 转换为 map。struct的字段名成为map的键,而struct本身成为map的值。这种转换非常有用,因为它使访问字段名变得更容易。
F.transform_values(..., lambda k, v: v.withField('struct_key', k))将map的键添加到map的值中(即添加到结构体中)。
F.map_values()提取Map的值,从而生成一个结构数组。
测试输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(9, ((1.1, [], '', True),(2.2, [], '', True),),)],
    'x int, a struct<b:struct<one:double,two:array<string>,three:string,four:boolean>,c:struct<one:double,two:array<string>,three:string,four:boolean>>')

df.show(truncate=0)
# +---+--------------------------------------+
# |x  |a                                     |
# +---+--------------------------------------+
# |9  |{{1.1, [], , true}, {2.2, [], , true}}|
# +---+--------------------------------------+
df.printSchema()
# root
#  |-- x: integer (nullable = true)
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- one: double (nullable = true)
#  |    |    |-- two: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- three: string (nullable = true)
#  |    |    |-- four: boolean (nullable = true)
#  |    |-- c: struct (nullable = true)
#  |    |    |-- one: double (nullable = true)
#  |    |    |-- two: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- three: string (nullable = true)
#  |    |    |-- four: boolean (nullable = true)

结果:

dtype = df.schema['a'].dataType[0].dataType.simpleString()
df = df.withColumn('a', F.map_values(F.transform_values(
    F.from_json(F.to_json("a"), f'map<string,{dtype}>'), 
    lambda k, v: v.withField('struct_key', k)
)))

df.show(truncate=0)
# +---+--------------------------------------------+
# |x  |a                                           |
# +---+--------------------------------------------+
# |9  |[{1.1, [], , true, b}, {2.2, [], , true, c}]|
# +---+--------------------------------------------+
df.printSchema()
# root
#  |-- x: integer (nullable = true)
#  |-- a: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- one: double (nullable = true)
#  |    |    |-- two: array (nullable = true)
#  |    |    |    |-- element: string (containsNull = true)
#  |    |    |-- three: string (nullable = true)
#  |    |    |-- four: boolean (nullable = true)
#  |    |    |-- struct_key: string (nullable = false)
l5tcr1uw

l5tcr1uw2#

下面是一种方法,首先在内部结构体中添加struct_key,然后使用它们创建一个数组。

# input
data_sdf = spark.createDataFrame([(((1, 2), (3, 4)), 'foo', 'bar')], 
                                 'a struct<b: struct<foo: int, bar: int>, c: struct<foo: int, bar: int>>, col2 string, col3 string'
                                 )

# +----------------+----+----+
# |               a|col2|col3|
# +----------------+----+----+
# |{{1, 2}, {3, 4}}| foo| bar|
# +----------------+----+----+

# root
#  |-- a: struct (nullable = true)
#  |    |-- b: struct (nullable = true)
#  |    |    |-- foo: integer (nullable = true)
#  |    |    |-- bar: integer (nullable = true)
#  |    |-- c: struct (nullable = true)
#  |    |    |-- foo: integer (nullable = true)
#  |    |    |-- bar: integer (nullable = true)
#  |-- col2: string (nullable = true)
#  |-- col3: string (nullable = true)

# processing
data_sdf. \
    selectExpr('a.*', *[c for c in data_sdf.columns if c not in ['a']]). \
    selectExpr(*['struct("{0}" as struct_key, {0}.*) as {0}'.format(c) for c in data_sdf.selectExpr('a.*').columns],
               *[c for c in data_sdf.columns if c not in ['a']]
               ). \
    withColumn('a', func.array(*data_sdf.selectExpr('a.*').columns)). \
    drop(*data_sdf.selectExpr('a.*').columns). \
    show(truncate=False)

# +----+----+----------------------+
# |col2|col3|a                     |
# +----+----+----------------------+
# |foo |bar |[{b, 1, 2}, {c, 3, 4}]|
# +----+----+----------------------+

# root
#  |-- col2: string (nullable = true)
#  |-- col3: string (nullable = true)
#  |-- a: array (nullable = false)
#  |    |-- element: struct (containsNull = false)
#  |    |    |-- struct_key: string (nullable = false)
#  |    |    |-- foo: integer (nullable = true)
#  |    |    |-- bar: integer (nullable = true)

相关问题