在将列(结构数组)添加到Dataframe之后,我想在这个添加的列上运行一个udf。现在,我不能通过结构的名称访问它们的属性,但是我可以通过它们的索引访问它们。
但是,如果缓存Dataframe,则按属性名称访问将开始工作。
以下是可复制代码:
import pyspark.sql.functions as spf
import pyspark.sql.types as spt
df = spark.createDataFrame([{"something": 1}])
tuple_schema = spt.ArrayType(
elementType=spt.StructType([spt.StructField("x", spt.FloatType()),
spt.StructField("y", spt.FloatType())]))
def generate_tuples():
return [(3.0, 4.0)]
tuple_udf = spf.udf(generate_tuples, tuple_schema)
df = df.withColumn("our_tuples", tuple_udf())
df.collect()
# [Row(something=1, our_tuples=[Row(x=3.0, y=4.0)])]
index_udf = spf.udf(lambda lst: max([z[1] for z in lst]) \
if len(lst) > 0 else 0.0, spt.FloatType())
attribute_udf = spf.udf(lambda lst: max([z.y for z in lst]) \
if len(lst) > 0 else 0.0, spt.FloatType())
这样做有效:✔️
index_df = df.withColumn("m", index_udf(df.our_tuples))
index_df.collect()
这不起作用:❌
attribute_df = df.withColumn("m", attribute_udf(df.our_tuples))
attribute_df.collect()
# AttributeError: 'tuple' object has no attribute 'y'
这同样有效:✔️
df.cache()
df.count()
attribute_df = df.withColumn("m", attribute_udf(df.our_tuples))
attribute_df.collect()
这是Spark虫,还是我不知道的预期行为?
暂无答案!
目前还没有任何答案,快来回答吧!