使PySpark UDF访问Python类中的示例变量

zc0qhyus  于 2022-11-01  发布在  Spark
关注(0)|答案(1)|浏览(126)

我尝试应用PySpark UDF在类中向PySpark DataFrame添加一个新列。Spark UDF必须是一个静态方法,才能在类中使用。下面的伪示例Case 1工作正常。
问题是,一旦UDF被定义为静态方法,我就不能在其中使用任何示例变量。因此,Case 2不起作用。

问题:我的问题不是关于案例2失败的原因。我想知道是否有任何方法可以让PySpark UDF访问上面例子中的示例变量?我知道this的解决方法,在这个例子中,UDF是在调用UDF(calculate_new_marks())的方法内部定义的。寻找替代方法。
案例1(这是可行的!):静态方法UDF没有使用任何示例变量。它只是给每个学生的分数加10分。

import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

class example():
    def __init__(self):
        self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
        self.increase = 10

    def create_spark_df(self):
        cSchema = StructType([StructField("Name", StringType())\
                             ,StructField("Marks", IntegerType())])
        return spark.createDataFrame(self.students, schema=cSchema)

    @staticmethod
    @udf(returnType=IntegerType())
    def add_ten_marks(marks):
        return marks + 10    

    def calculate_new_marks(self):
        df = self.create_spark_df()
        df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
        return df

c = example()
c.calculate_new_marks().show()

+---------+-----+---------+
|     Name|Marks|New Marks|
+---------+-----+---------+
|student_0|    2|       12|
|student_1|   42|       52|
|student_2|   11|       21|
+---------+-----+---------+

情况2(此操作失败!):静态方法UDF带有,使用任何示例变量(此处为self.increase)。

import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField

class example():
    def __init__(self):
        self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
        self.increase = 10

    def create_spark_df(self):
        cSchema = StructType([StructField("Name", StringType())\
                             ,StructField("Marks", IntegerType())])
        return spark.createDataFrame(self.students, schema=cSchema)

    @staticmethod
    @udf(returnType=IntegerType())
    def add_ten_marks(marks):
        return marks + self.increase # <--- constant replaced by instance variable. Problematic Line!!!

    def calculate_new_marks(self):
        df = self.create_spark_df()
        df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
        return df

c = example()
c.calculate_new_marks().show()

>>> PythonException: An exception was thrown from a UDF: 'NameError: name 'self' is not defined'
nwsw7zdq

nwsw7zdq1#

Spark UDF应该是自包含的,在worker上序列化和执行,而不是在driver上,这就是为什么它应该是静态的。所以,你可以尝试下面的替代方法,
您可以将变量increase作为参数传递给udf,如下所示。
第一个

相关问题