from pyspark.sql.functions import struct, udf
sum_udf = udf(lambda x: sum(x[1:]))
df_sum = df.withColumn("result", sum_udf(struct([df[col] for col in df.columns])))
udf_ = spark.udf.register("udf_",self.funct)
print("registered udf................:",udf_)
df = df.withColumn('result',udf_(struct([df[col] for col in df.columns])))
print("after df call")
其中self.funct是在另一个类中定义的,我试图使用spark.udf.register注册这个函数,并从df.withColumn调用这个函数,返回结果不起作用。 输出:已注册的udf................:DF<function function.funct at 0x7f8ee4c5bf28>呼叫后 但真实的上这并不是进入函数类的funct函数。 function class如下:definit:def funct(self,df):print(“inside funct function”)return F.col(S)*F.col(S) S列是df Dataframe 和int的一部分
8条答案
按热度按时间f1tvaqid1#
如果要传递给UDF的所有列都具有相同的数据类型,则可以使用array作为输入参数,例如:
s8vozzvw2#
另一个简单的方法,没有数组和结构。
b4wnujal3#
使用结构而不是数组
ki0zmccv4#
也许这是一个迟来的答案,但我不喜欢在没有必要的情况下使用UDF,所以:
这里你可以使用
Column
中实现的任何操作。另外,如果你想编写一个具有特定逻辑的自定义udf
,你可以使用它,因为Column
提供了树执行操作。而不需要对它进行数组和求和。如果与process as array操作相比,从性能的Angular 来看,它会很糟糕,让我们来看看物理计划,在我的情况下和array情况下,在我的情况下和
array
情况下。我的案例:
array case:
如果可能-我们需要避免使用UDF,因为Catalyst不知道如何优化这些UDF
gpfsuwkq5#
如果您不想键入所有的列名,而只是将所有列转储到UDF中,则需要在结构中 Package 一个列表解析。
z9ju0rcb6#
这是我尝试过的方法,似乎是有效的:
fzwojiic7#
其中self.funct是在另一个类中定义的,我试图使用spark.udf.register注册这个函数,并从df.withColumn调用这个函数,返回结果不起作用。
输出:已注册的udf................:DF<function function.funct at 0x7f8ee4c5bf28>呼叫后
但真实的上这并不是进入函数类的funct函数。
function class如下:definit:def funct(self,df):print(“inside funct function”)return F.col(S)*F.col(S)
S列是df Dataframe 和int的一部分
yfwxisqw8#
现在你不再需要
struct
了: