给定一个 Dataframe ,其中每一行都是一个结构体,answers
字段保存一个答案结构体数组,每个结构体都有多个字段,下面的代码应该处理数组中的每个答案,方法是检查其text
字段并对其应用一些处理。“(注意,这段代码是针对运行Glue 3.0的AWS Glue笔记本的,但除了spark上下文创建之外,它应该可以在任何PySpark〉= 3.1上工作):
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
import pyspark.sql.types as T
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
test_schema = T.StructType([
T.StructField('answer_id', T.StringType(), True),
T.StructField('answers', T.ArrayType(
T.StructType([
T.StructField('text', T.StringType(), True)
]), True), True)
])
json_df = spark.createDataFrame(data=[
[1, [("a1",),("a2",),("a3",)]],
[2, [("b1",),("b2",),("b3",)]]
], schema=test_schema)
json_df.show(truncate=False)
json_df.printSchema()
导致:
+---------+------------------+
|answer_id|answers |
+---------+------------------+
|1 |[{a1}, {a2}, {a3}]|
|2 |[{b1}, {b2}, {b3}]|
+---------+------------------+
root
|-- answer_id: string (nullable = true)
|-- answers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = true)
然后运行转换:
@F.udf(returnType=T.StringType())
def process_text(text):
if text:
return "foo: " + str(text)
return "no text"
def normalize_answer(answer):
return answer.withField(
"processed_text_input",
answer.getField("text")
).withField(
"processed_text",
process_text(answer.getField("text"))
)
json_mod_df = json_df.withColumn(
"answers",
F.transform("answers", normalize_answer)
)
json_mod_df.show(truncate=False)
导致:
+---------+---------------------------------------------------------+
|answer_id|answers |
+---------+---------------------------------------------------------+
|1 |[{a1, a1, no text}, {a2, a2, no text}, {a3, a3, no text}]|
|2 |[{b1, b1, no text}, {b2, b2, no text}, {b3, b3, no text}]|
+---------+---------------------------------------------------------+
在我的实际实现中,process_text
更加复杂,因此整个过程不能用定义转换的lambda来表示。
问题是,当我在一个大得多的答案数据集上运行这个函数时,processed_text_input
与processed_text
字段是完全不同的文本,即processed_text
中“foo:“似乎来自一个完全不同的答案结构体,从输入数据框中的其他项目的不同数组中。有时它也会一起丢失。在这个玩具例子中,文本似乎只是丢失了。我尝试添加100行,但在所有情况下文本都丢失了。我不知道为什么在我的完整版本的这段代码中,大约有50k行,我从随机行中获得文本。无论如何,这个玩具问题中显示的文本也不是我想要的,processed_text_input
中的值是相应答案文本字段中的正确值。
这是一个bug还是我应该改变这个表达式的结构?我知道我可以分解答案数组并以更传统的方式处理它,但我尝试使用最新的transform
和withField
函数。
1条答案
按热度按时间wvyml7n51#
这将有助于如果你能分享更多的信息,如样本数据集.
我试着重现你的问题但是
array -> transform
的lambda函数内部的udf
调用导致了一些无效的状态,可能是函数参数在求值后消失了。我想知道,为什么你的代码运行得很好。无论如何,如果你能分解
answers
数组并在单个answer
结构体上应用转换,那么你也可以从Spark的分布式处理中获益。使用的示例数据集是具有3个文本常量的数组:
输出: