如何实现一个自定义的pyspark分解(对于结构数组),一次分解4列?

lb3vh1jj  于 2021-05-24  发布在  Spark
关注(0)|答案(1)|浏览(496)

我正在尝试在pyspark中实现一个自定义爆炸。我有4列,它们是具有几乎相同模式的结构数组(一列结构包含的字段比其他三列少一个)。
对于dataframe中的每一行,我有4列,它们是结构的数组。栏目包括学生、助教、教师、管理员。
学生、助教和教师是一个具有场的结构阵列 id , student_level 以及 name .
例如,这里是dataframe中的一个示例行。

“学生”、“助教”和“教师”结构都具有相同的模式(“id”、“学生级别”、“名称”),administrators结构具有“id”和“名称”字段,但缺少“学生级别”。
我想执行一个自定义分解,这样每一行我都有一个条目,每个学生,助教,教授和管理员,以及原始列名,以防我不得不搜索“人类型”。因此,对于上面一行的屏幕截图,输出将是8行:

+-----------+---------------------+----+---------------+----------+
| School_id |        type         | id | student_level |   name   |
+-----------+---------------------+----+---------------+----------+
|      1999 | students            |  1 | 0             | Brian    |
|      1999 | students            |  9 | 2             | Max      |
|      1999 | teaching_assistants | 19 | 0             | Xander   |
|      1999 | teachers            | 21 | 0             | Charlene |
|      1999 | teachers            | 12 | 2             | Rob      |
|      1999 | administrators      | 23 | None          | Marsha   |
|      1999 | administrators      | 11 | None          | Ryan     |
|      1999 | administrators      | 14 | None          | Bob      |
+-----------+---------------------+----+---------------+----------+

对于管理员来说,student\ u level列只能为空。问题是,如果我使用explode函数,我会将所有这些项放在不同的列中。
有可能在Pypark中实现这一点吗?我的一个想法是找出如何将4个数组列组合成1个数组,然后对数组进行分解,尽管我不确定组合结构数组并将列名作为字段是否可行(我尝试过各种方法),而且我也不知道如果管理员缺少一个字段,该方法是否有效。
在过去,我通过转换为rdd并使用flatmap/自定义udf来实现这一点,但是对于数百万行来说效率非常低。

0lvr5msh

0lvr5msh1#

其思想是使用堆栈来转换列 students , teaching_assistants , teachers 以及 administrators 分为不同的行,每个行的值都正确 type . 之后,可以分解包含数据的列,然后将单个结构的元素转换为单独的列。
使用 stack 要求堆叠的所有列具有相同的类型。这意味着所有列必须包含相同结构的数组,并且结构的所有元素的可空性必须匹配。因此 administrators 列必须首先转换为正确的结构类型。

df.withColumn("administrators", F.expr("transform(administrators, " +
        "a -> if(1<2,named_struct('id', a.id, 'name', a.name, 'student_level', "+
              "cast(null as long)),null))"))\
  .select("School_id", F.expr("stack(4, 'students', students, "+
          "'teaching_assistants', teaching_assistants, 'teachers', teachers, "+
          "'administrators', administrators) as (type, temp1)")) \
  .withColumn("temp2", F.explode("temp1"))\
  .select("School_id", "type", "temp2.id", "temp2.name", "temp2.student_level")\
  .show()

印刷品

+---------+-------------------+---+--------+-------------+
|School_id|               type| id|    name|student_level|
+---------+-------------------+---+--------+-------------+
|     1999|           students|  1|   Brian|            0|
|     1999|           students|  9|     Max|            2|
|     1999|teaching_assistants| 19|  Xander|            0|
|     1999|           teachers| 21|Charlene|            0|
|     1999|           teachers| 12|     Rob|            2|
|     1999|     administrators| 23|  Marsha|         null|
|     1999|     administrators| 11|    Ryan|         null|
|     1999|     administrators| 14|     Bob|         null|
+---------+-------------------+---+--------+-------------+

奇怪的样子 if(1<2, named_struct(...), null) 在第一行中,需要为 administrators 数组。
此解决方案适用于spark 2.4+。如果有可能改变 administrators 在前面的步骤中,此解决方案也适用于早期版本。

相关问题