为嵌套的pyspark对象创建模式

1wnzp6jl  于 2021-07-13  发布在  Spark
关注(0)|答案(1)|浏览(428)

我试图创建一些示例Dataframe来测试一些函数,定期得到带有嵌套对象(数组或更多json对象)的json对象,我需要针对不同的类型测试这些对象,即 Struct 以及 Array 并根据它们的类型将它们传递给正确的函数,以创建表格Dataframe。
这些对象来自API一些内部的,一些外部的,所以我任由应用程序开发人员摆布。
假设我想创建一个表,如下所示进行测试:

+----+------+------------------------------+
|    |   id | arr                          |
|----+------+------------------------------|
|  0 |    1 | [[0, 1, 2, 3], [4, 5, 6, 7]] |
|  1 |    2 | [[1, 2, 3], [4, 5, 6]]       |
+----+------+------------------------------+

我的假设是,我需要创建一个模式,如下所示:

from pyspark.sql.types import StructField,StructType,StringType,IntegerType,ArrayType

schema = StructType([
  StructField('id', IntegerType(),True),
  StructField('arr', ArrayType(ArrayType(IntegerType(),True),True),True)

])

data = [ [1,2], #< id.
 [[ [0,1,2,3], [4,5,6,7]], # < arr
        [[1,2,3,],[4,5,6]]] 
]

df = spark.createDataFrame(data,schema)

返回一个 TypeError :

field arr: ArrayType(IntegerType,true) can not accept object 2 in type <class 'int'>

我哪里出错了?
当所有这些都说了又做了之后,当我通过一个递归函数传递它们时,我将得到输出:

+----+------+-------+
|    |   id |   arr |
|----+------+-------|
|  0 |    1 |     0 |
|  0 |    1 |     1 |
|  0 |    1 |     2 |
|  0 |    1 |     3 |
|  0 |    1 |     4 |
|  0 |    1 |     5 |
|  0 |    1 |     6 |
|  0 |    1 |     7 |
|  1 |    2 |     1 |
|  1 |    2 |     2 |
|  1 |    2 |     3 |
|  1 |    2 |     4 |
|  1 |    2 |     5 |
|  1 |    2 |     6 |
+----+------+-------+
aiqt4smr

aiqt4smr1#

名单 data 应该包含行列表,而不是列列表。

from pyspark.sql.types import StructField,StructType,StringType,IntegerType,ArrayType

schema = StructType([
  StructField('id', IntegerType(),True),
  StructField('arr', ArrayType(ArrayType(IntegerType(),True),True),True)
])

data = [
    [1, [[0,1,2,3], [4,5,6,7]] ],
    [2, [[1,2,3,],[4,5,6]] ]
]

df = spark.createDataFrame(data,schema)

df.show(truncate=False)
+---+----------------------------+
|id |arr                         |
+---+----------------------------+
|1  |[[0, 1, 2, 3], [4, 5, 6, 7]]|
|2  |[[1, 2, 3], [4, 5, 6]]      |
+---+----------------------------+

要分解阵列,可以执行以下操作:

import pyspark.sql.functions as F

df.withColumn('arr', F.explode(F.flatten('arr'))).show()
+---+---+
| id|arr|
+---+---+
|  1|  0|
|  1|  1|
|  1|  2|
|  1|  3|
|  1|  4|
|  1|  5|
|  1|  6|
|  1|  7|
|  2|  1|
|  2|  2|
|  2|  3|
|  2|  4|
|  2|  5|
|  2|  6|
+---+---+

相关问题