pyspark 在Spark Dataframe 字段中创建包含数组中每个结构的第一个元素的数组

khbbv19g  于 2022-12-11  发布在  Spark
关注(0)|答案(3)|浏览(167)

在PySpark Dataframe 中,如何从一个结构体数组转换到每个结构体的第一个元素的数组?
下面的例子会让我们更清楚地了解这一点。

scoresheet = spark.createDataFrame([("Alice", [("Math",100),("English",80)]),("Bob", [("Math", 90)]),("Charlie", [])],["name","scores"])

上面定义的模式和 Dataframe 如下所示:

root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: long (nullable = true)

+-------+--------------------------+
|name   |scores                    |
+-------+--------------------------+
|Alice  |[[Math,100], [English,80]]|
|Bob    |[[Math,90]]               |
|Charlie|[]                        |
+-------+--------------------------+

你可以看到,每个学生的科目分数都包含在一个(Subject,Marks)类型的有序结构体中,每个学生的科目数不是常数,可能是零。
接下来,我将生成一个新的 Dataframe ,其中只包含每个学生的主题,而不包含分数。它应该为没有主题的学生生成一个空数组。简而言之,它应该如下所示:

+-------+---------------+
|name   |scores         |
+-------+---------------+
|Alice  |[Math, English]|
|Bob    |[Math]         |
|Charlie|[]             |
+-------+---------------+

请注意,行数与前面相同;所以我不能使用explode,除非我在之后重新组合,这在计算上效率很低。

voj3qocg

voj3qocg1#

最好的方法是udf:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

take_first = udf(lambda rows: [row[0] for row in rows], ArrayType(StringType()))

scoresheet.withColumn("scores", take_first("scores"))
06odsfpq

06odsfpq2#

作为参考,下面是包含分解、分组依据和聚合的版本。

import pyspark.sql.functions as f

scoresheet.select('name').join(
    scoresheet
    .withColumn('score', f.explode('scores'))
    .withColumn('subject', f.col('score').getField('_1'))
    .groupBy('name')
    .agg(f.collect_list('subject').alias('subjects'))
    , on='name'
    , how='left'
)

由于这是在PySpark中完成的,所以在某些情况下,如果它比UDF版本快,我不会感到惊讶,但我没有做任何分析。左连接是为了确保没有结果的学生不会在最终结果中被丢弃。

2ic8powd

2ic8powd3#

这将获得预期的结果:

from pyspark.sql import functions as F

scoresheet.select(F.col("scores._1").alias("subject"))

输出为:

+-------+----------------------------+---------------+
|name   |scores                      |subject        |
+-------+----------------------------+---------------+
|Alice  |[[Math, 100], [English, 80]]|[Math, English]|
|Bob    |[[Math, 90]]                |[Math]         |
|Charlie|[]                          |[]             |
+-------+----------------------------+---------------+

_1是数组元素的结构字段名,如记分表架构所示:

root
 |-- name: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: string (nullable = true)
 |    |    |-- _2: long (nullable = true)

作为一般规则,.select("<column_name>.<field_name>")从结构体或结构体数组中提取字段。
由于DataFrame构造中未提供架构,因此scores列中得字段获得了自动生成得名称_1_2 .若要提高可读性,可以将它们指定为subjectmark,然后使用scores.subjectscores.mark访问它们:

from pyspark.sql import types as T
from pyspark.sql import functions as F

scoresheet = spark.createDataFrame([
        ("Alice", [("Math", 100), ("English", 80)]),
        ("Bob", [("Math", 90)]),
        ("Charlie", []),
    ],
    schema=T.StructType([
        T.StructField("name", T.StringType()),
        T.StructField("scores", T.ArrayType(T.StructType([
                T.StructField(name="subject", dataType=T.StringType()),
                T.StructField(name="mark", dataType=T.IntegerType()),
        ])))
    ]),
)
scoresheet.select("scores.subject")

相关问题