如何修复这个修改数组中复杂结构的Pyspark

ruarlubt  于 2022-12-26  发布在  Spark
关注(0)|答案(1)|浏览(126)

给定一个 Dataframe ,其中每一行都是一个结构体,answers字段保存一个答案结构体数组,每个结构体都有多个字段,下面的代码应该处理数组中的每个答案,方法是检查其text字段并对其应用一些处理。“(注意,这段代码是针对运行Glue 3.0的AWS Glue笔记本的,但除了spark上下文创建之外,它应该可以在任何PySpark〉= 3.1上工作):

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
import pyspark.sql.types as T

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

test_schema = T.StructType([
    T.StructField('answer_id', T.StringType(), True),
    T.StructField('answers', T.ArrayType(
        T.StructType([
            T.StructField('text', T.StringType(), True)
        ]), True), True)
    ])

json_df = spark.createDataFrame(data=[
    [1, [("a1",),("a2",),("a3",)]],
    [2, [("b1",),("b2",),("b3",)]]
], schema=test_schema)
json_df.show(truncate=False)
json_df.printSchema()

导致:

+---------+------------------+
|answer_id|answers           |
+---------+------------------+
|1        |[{a1}, {a2}, {a3}]|
|2        |[{b1}, {b2}, {b3}]|
+---------+------------------+

root
 |-- answer_id: string (nullable = true)
 |-- answers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)

然后运行转换:

@F.udf(returnType=T.StringType())
def process_text(text):
    if text:
        return "foo: " + str(text)
    return "no text"

def normalize_answer(answer):
    return answer.withField(
        "processed_text_input",
        answer.getField("text")
    ).withField(
        "processed_text",
        process_text(answer.getField("text"))
    )

json_mod_df = json_df.withColumn(
    "answers", 
    F.transform("answers", normalize_answer)
)

json_mod_df.show(truncate=False)

导致:

+---------+---------------------------------------------------------+
|answer_id|answers                                                  |
+---------+---------------------------------------------------------+
|1        |[{a1, a1, no text}, {a2, a2, no text}, {a3, a3, no text}]|
|2        |[{b1, b1, no text}, {b2, b2, no text}, {b3, b3, no text}]|
+---------+---------------------------------------------------------+

在我的实际实现中,process_text更加复杂,因此整个过程不能用定义转换的lambda来表示。
问题是,当我在一个大得多的答案数据集上运行这个函数时,processed_text_inputprocessed_text字段是完全不同的文本,即processed_text中“foo:“似乎来自一个完全不同的答案结构体,从输入数据框中的其他项目的不同数组中。有时它也会一起丢失。在这个玩具例子中,文本似乎只是丢失了。我尝试添加100行,但在所有情况下文本都丢失了。我不知道为什么在我的完整版本的这段代码中,大约有50k行,我从随机行中获得文本。无论如何,这个玩具问题中显示的文本也不是我想要的,processed_text_input中的值是相应答案文本字段中的正确值。
这是一个bug还是我应该改变这个表达式的结构?我知道我可以分解答案数组并以更传统的方式处理它,但我尝试使用最新的transformwithField函数。

wvyml7n5

wvyml7n51#

这将有助于如果你能分享更多的信息,如样本数据集.
我试着重现你的问题但是array -> transform的lambda函数内部的udf调用导致了一些无效的状态,可能是函数参数在求值后消失了。我想知道,为什么你的代码运行得很好。
无论如何,如果你能分解answers数组并在单个answer结构体上应用转换,那么你也可以从Spark的分布式处理中获益。

import pyspark.sql.functions as F
import pyspark.sql.types as T

json_df = spark.createDataFrame(data=[[[("txt1",),("txt2",),("txt3",)]]], schema="answers array<struct<text string>>")
json_df.show(truncate=False)
json_df.printSchema()

@F.udf(returnType=T.StringType())
def process_text(text):
    return "foo: " + text

def normalize_answer(answer):
    return answer.withField(
        "processed_text",
        process_text(answer.getField("text"))
    ).withField(
        "processed_text_input",
        answer.getField("text")
    )

json_df = json_df.withColumn("answers", F.explode("answers"))

json_df = json_df.withColumn("processed_text", F.col("answers").getField("text"))
json_df = json_df.withColumn("processed_text_input", process_text(F.col("answers").getField("text")))

json_df.show(truncate=False)

使用的示例数据集是具有3个文本常量的数组:

+------------------------+
|answers                 |
+------------------------+
|[{txt1}, {txt2}, {txt3}]|
+------------------------+

root
 |-- answers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)

输出:

+-------+--------------+--------------------+
|answers|processed_text|processed_text_input|
+-------+--------------+--------------------+
|{txt1} |txt1          |foo: txt1           |
|{txt2} |txt2          |foo: txt2           |
|{txt3} |txt3          |foo: txt3           |
+-------+--------------+--------------------+

相关问题