我尝试应用PySpark UDF在类中向PySpark DataFrame添加一个新列。Spark UDF必须是一个静态方法,才能在类中使用。下面的伪示例Case 1工作正常。
问题是,一旦UDF被定义为静态方法,我就不能在其中使用任何示例变量。因此,Case 2不起作用。
问题:我的问题不是关于案例2失败的原因。我想知道是否有任何方法可以让PySpark UDF访问上面例子中的示例变量?我知道this的解决方法,在这个例子中,UDF是在调用UDF(calculate_new_marks()
)的方法内部定义的。寻找替代方法。
案例1(这是可行的!):静态方法UDF没有使用任何示例变量。它只是给每个学生的分数加10分。
import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
class example():
def __init__(self):
self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
self.increase = 10
def create_spark_df(self):
cSchema = StructType([StructField("Name", StringType())\
,StructField("Marks", IntegerType())])
return spark.createDataFrame(self.students, schema=cSchema)
@staticmethod
@udf(returnType=IntegerType())
def add_ten_marks(marks):
return marks + 10
def calculate_new_marks(self):
df = self.create_spark_df()
df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
return df
c = example()
c.calculate_new_marks().show()
+---------+-----+---------+
| Name|Marks|New Marks|
+---------+-----+---------+
|student_0| 2| 12|
|student_1| 42| 52|
|student_2| 11| 21|
+---------+-----+---------+
情况2(此操作失败!):静态方法UDF带有,使用任何示例变量(此处为self.increase
)。
import numpy as np
from pyspark.sql.types import StringType, IntegerType, StructType, StructField
class example():
def __init__(self):
self.students = [[f'student_{i}', np.random.randint(80)] for i in range(3)]
self.increase = 10
def create_spark_df(self):
cSchema = StructType([StructField("Name", StringType())\
,StructField("Marks", IntegerType())])
return spark.createDataFrame(self.students, schema=cSchema)
@staticmethod
@udf(returnType=IntegerType())
def add_ten_marks(marks):
return marks + self.increase # <--- constant replaced by instance variable. Problematic Line!!!
def calculate_new_marks(self):
df = self.create_spark_df()
df = df.withColumn("New Marks", self.add_ten_marks(col("Marks")))
return df
c = example()
c.calculate_new_marks().show()
>>> PythonException: An exception was thrown from a UDF: 'NameError: name 'self' is not defined'
1条答案
按热度按时间nwsw7zdq1#
Spark UDF应该是自包含的,在worker上序列化和执行,而不是在driver上,这就是为什么它应该是静态的。所以,你可以尝试下面的替代方法,
您可以将变量increase作为参数传递给udf,如下所示。
第一个