如何通过继承向Pyspark Dataframe类添加自定义方法

1yjd4xko  于 2023-01-26  发布在  Apache
关注(0)|答案(5)|浏览(159)

我尝试继承DataFrame类并添加额外的自定义方法,如下所示,这样我就可以流畅地链接,并确保所有方法引用相同的 Dataframe 。

from pyspark.sql.dataframe import DataFrame

class Myclass(DataFrame):
def __init__(self,df):
    super().__init__(df._jdf, df.sql_ctx)

def add_column3(self):
 // Add column1 to dataframe received
  self._jdf.withColumn("col3",lit(3))
  return self

def add_column4(self):
 // Add column to dataframe received
  self._jdf.withColumn("col4",lit(4))
  return self

if __name__ == "__main__":
'''
Spark Context initialization code
col1 col2
a 1
b 2
'''
  df = spark.createDataFrame([("a",1), ("b",2)], ["col1","col2"])
  myobj = MyClass(df)
  ## Trying to accomplish below where i can chain MyClass methods & Dataframe methods
  myobj.add_column3().add_column4().drop_columns(["col1"])

'''
Expected Output
col2, col3,col4
1,3,4
2,3,4
'''
xwbd5t1u

xwbd5t1u1#

实际上,您不需要继承DataFrame类来向DataFrame对象添加一些自定义方法。
在Python中,你可以添加一个自定义属性来 Package 你的方法,如下所示:

# decorator to attach a function to an attribute
def add_attr(cls):
    def decorator(func):
        @wraps(func)
        def _wrapper(*args, **kwargs):
            f = func(*args, **kwargs)
            return f

        setattr(cls, func.__name__, _wrapper)
        return func

    return decorator

# custom functions
def custom(self):
    @add_attr(custom)
    def add_column3():
        return self.withColumn("col3", lit(3))

    @add_attr(custom)
    def add_column4():
        return self.withColumn("col4", lit(4))

    return custom

# add new property to the Class pyspark.sql.DataFrame
DataFrame.custom = property(custom)

# use it
df.custom.add_column3().show()
tyg4sfes

tyg4sfes2#

blackbishop给出的答案值得一看,即使在撰写本文时还没有人支持它。这似乎是扩展Spark DataFrame类的一个很好的通用方法,我认为还有其他复杂对象。我将它稍微改写为:

from pyspark.sql.dataframe import DataFrame
from functools import wraps

# Create a decorator to add a function to a python object
def add_attr(cls):
    def decorator(func):
        @wraps(func)
        def _wrapper(*args, **kwargs):
            f = func(*args, **kwargs)
            return f

        setattr(cls, func.__name__, _wrapper)
        return func

    return decorator

  
# Extensions to the Spark DataFrame class go here
def dataframe_extension(self):
  @add_attr(dataframe_extension)
  def drop_fusion_gdpp_events():
    return(
      self
      .where(~((col('test1') == 'ABC') & (col('test2') =='XYZ')))
      .where(~col('test1').isin(['AAA', 'BBB']))
    )
  return dataframe_extension

DataFrame.dataframe_extension = property(dataframe_extension)
kx7yvsdv

kx7yvsdv3#

下面是我的解决方案(基于您的代码)。我不知道这是否是最佳实践,但至少正确地做到了您想要的。 Dataframe 是不可变的对象,所以在添加一个新列后,我们创建一个新对象,但不是Dataframe对象,而是Myclass对象,因为我们希望拥有 Dataframe 和自定义方法。

from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()

class MyClass(DataFrame):
   def __init__(self,df):
      super().__init__(df._jdf, df.sql_ctx)
      self._df = df

  def add_column3(self):
      #Add column1 to dataframe received
      newDf=self._df.withColumn("col3",F.lit(3))
      return MyClass(newDf)

  def add_column4(self):
      #Add column2 to dataframe received
      newDf=self._df.withColumn("col4",F.lit(4))
      return MyClass(newDf)

df = spark.createDataFrame([("a",1), ("b",2)], ["col1","col2"])
myobj = MyClass(df)
myobj.add_column3().add_column4().na.drop().show()

# Result:
+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|   a|   1|   3|   4|
|   b|   2|   3|   4|
+----+----+----+----+
ddarikpa

ddarikpa4#

我想你是在寻找这样的东西:

class dfc:
  def __init__(self, df):
    self.df = df
    
  def func(self, num):
    self.df = self.df.selectExpr(f"id * {num} AS id")
  
  def func1(self, num1):
    self.df = self.df.selectExpr(f"id * {num1} AS id")
    
  def dfdis(self):
    self.df.show()

在这个例子中,有一个 Dataframe 被传递给构造函数方法,它被类中定义的后续方法使用。每当相应的方法被调用时, Dataframe 的状态被存储在示例化对象中。

df = spark.range(10)

ob = dfc(df)

ob.func(2)

ob.func(2)

ob.dfdis()
oaxa6hgo

oaxa6hgo5#

注意:Pyspark在即将到来的版本中不支持df.sql_ctx,因此这个答案不适用于未来。
我喜欢许多其他的答案,但评论中有几个挥之不去的问题。我认为可以这样解决:

  • 我们需要把所有的东西都看作是不可变的,所以我们返回子类
  • 我们不需要在任何地方调用self._jdf--相反,只要将self当作DataFrame使用即可(因为它是DataFrame--这就是我们使用继承的原因!)
  • 我们需要显式构造一个新的类,因为从self.foo返回的值将是DataFrame基类型
  • 我已经添加了一个DataFrameExtender子类来中介新类的创建。子类将继承父构造函数,如果不被覆盖的话,所以我们可以整理DataFrame构造函数以获取DataFrame,并添加存储元数据的功能。

我们可以为数据到达的概念阶段创建一个新类,还可以添加辅助标志来帮助我们识别 Dataframe 中数据的状态,这里我在调用add column方法时添加一个标志,并推进所有现有的标志,你可以做任何你喜欢做的事情。
此设置意味着您可以创建一系列DataFrameExtender对象,例如:

  • RawData,实现.clean()方法,返回CleanedData
  • CleanedData,实现.normalize()方法,返回ModelReadyData
  • ModelReadyData,它实现.train(model).predict(model),或者.summarize(),它在模型中用作基本DataFrame对象。

通过将这些方法拆分成不同的类,这意味着我们不能在RawData上调用.train(),但是我们可以获取RawData对象并将.clean().normalize().train()链接在一起。这是一种类似函数的方法,但是使用不可变对象来帮助解释。
请注意,Spark中的DataFrame是延迟求值的,这对于这种方法来说非常好。所有这些代码只会生成一个最终的“未求值”DataFrame对象,该对象包含了“将要"执行的所有操作。我们不必担心内存或副本等问题。

from pyspark.sql.dataframe import DataFrame

class DataFrameExtender(DataFrame):
    def __init__(self,df,**kwargs):
        self.flags = kwargs
        super().__init__(df._jdf, df.sql_ctx)

class ColumnAddedData(DataFrameExtender):
    def add_column3(self):
        df_added_column = self.withColumn("col3", lit(3))
        return ColumnAddedData(df_added_column, with_col3=True, **self.flags)
    def add_column4(self):
        ## Add a bit of complexity: do not call again if we have already called this method
        if not self.flags['with_col4']:
            df_added_column = self.withColumn("col4", lit(4))
            return ColumnAddedData(df_added_column, with_col4=True, **self.flags)
        return self

相关问题