Pyspark:在UDF中传递多个列

brqmpdu1  于 2023-06-21  发布在  Spark
关注(0)|答案(8)|浏览(123)

我正在写一个用户定义的函数,它将接受 Dataframe 中除第一列外的所有列,并进行求和(或任何其他操作)。现在 Dataframe 有时可以有3列或4列或更多。它会有所不同。
我知道我可以硬编码4列名称作为传递在自定义框架,但在这种情况下,它会有所不同,所以我想知道如何得到它?
这里有两个例子,在第一个例子中,我们有两列要添加,在第二个例子中,我们有三列要添加。

f1tvaqid

f1tvaqid1#

如果要传递给UDF的所有列都具有相同的数据类型,则可以使用array作为输入参数,例如:

>>> from pyspark.sql.types import IntegerType
>>> from pyspark.sql.functions import udf, array
>>> sum_cols = udf(lambda arr: sum(arr), IntegerType())
>>> spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B']) \
...     .withColumn('Result', sum_cols(array('A', 'B'))).show()
+---+---+---+------+
| ID|  A|  B|Result|
+---+---+---+------+
|101|  1| 16|    17|
+---+---+---+------+

>>> spark.createDataFrame([(101, 1, 16, 8)], ['ID', 'A', 'B', 'C'])\
...     .withColumn('Result', sum_cols(array('A', 'B', 'C'))).show()
+---+---+---+---+------+
| ID|  A|  B|  C|Result|
+---+---+---+---+------+
|101|  1| 16|  8|    25|
+---+---+---+---+------+
s8vozzvw

s8vozzvw2#

另一个简单的方法,没有数组和结构。

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def sum(x, y):
    return x + y

sum_cols = udf(sum, IntegerType())

a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols('A', 'B')).show()
b4wnujal

b4wnujal3#

使用结构而不是数组

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct
sum_cols = udf(lambda x: x[0]+x[1], IntegerType())
a=spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])
a.show()
a.withColumn('Result', sum_cols(struct('A', 'B'))).show()
ki0zmccv

ki0zmccv4#

也许这是一个迟来的答案,但我不喜欢在没有必要的情况下使用UDF,所以:

from pyspark.sql.functions import col
from functools import reduce
data = [["a",1,2,5],["b",2,3,7],["c",3,4,8]]
df = spark.createDataFrame(data,["id","v1","v2",'v3'])

calculate = reduce(lambda a, x: a+x, map(col, ["v1","v2",'v3']))

df.withColumn("Result", calculate)
#
#id v1  v2  v3  Result
#a  1   2   5   8
#b  2   3   7   12
#c  3   4   8   15

这里你可以使用Column中实现的任何操作。另外,如果你想编写一个具有特定逻辑的自定义udf,你可以使用它,因为Column提供了树执行操作。而不需要对它进行数组和求和。
如果与process as array操作相比,从性能的Angular 来看,它会很糟糕,让我们来看看物理计划,在我的情况下和array情况下,在我的情况下和array情况下。
我的案例:

== Physical Plan ==
*(1) Project [id#355, v1#356L, v2#357L, v3#358L, ((v1#356L + v2#357L) + v3#358L) AS Result#363L]
+- *(1) Scan ExistingRDD[id#355,v1#356L,v2#357L,v3#358L]

array case:

== Physical Plan ==
*(2) Project [id#339, v1#340L, v2#341L, v3#342L, pythonUDF0#354 AS Result#348]
+- BatchEvalPython [<lambda>(array(v1#340L, v2#341L, v3#342L))], [pythonUDF0#354]
   +- *(1) Scan ExistingRDD[id#339,v1#340L,v2#341L,v3#342L]

如果可能-我们需要避免使用UDF,因为Catalyst不知道如何优化这些UDF

gpfsuwkq

gpfsuwkq5#

如果您不想键入所有的列名,而只是将所有列转储到UDF中,则需要在结构中 Package 一个列表解析。

from pyspark.sql.functions import struct, udf
sum_udf = udf(lambda x: sum(x[1:]))
df_sum = df.withColumn("result", sum_udf(struct([df[col] for col in df.columns])))
z9ju0rcb

z9ju0rcb6#

这是我尝试过的方法,似乎是有效的:

colsToSum = df.columns[1:]
df_sum = df.withColumn("rowSum", sum([df[col] for col in colsToSum]))
fzwojiic

fzwojiic7#

udf_ = spark.udf.register("udf_",self.funct)
            print("registered udf................:",udf_)
            df = df.withColumn('result',udf_(struct([df[col] for col in df.columns]))) 
            print("after df call")

其中self.funct是在另一个类中定义的,我试图使用spark.udf.register注册这个函数,并从df.withColumn调用这个函数,返回结果不起作用。
输出:已注册的udf................:DF<function function.funct at 0x7f8ee4c5bf28>呼叫后
但真实的上这并不是进入函数类的funct函数。
function class如下:definit:def funct(self,df):print(“inside funct function”)return F.col(S)*F.col(S)
S列是df Dataframe 和int的一部分

yfwxisqw

yfwxisqw8#

现在你不再需要struct了:

from pyspark.sql.functions import udf

@udf('integer')
def my_sum(*args):
  return sum(args)

df = spark.createDataFrame([(101, 1, 16)], ['ID', 'A', 'B'])

display(df.withColumn('total', my_sum(*df.columns[1:])))

相关问题