我尝试继承DataFrame类并添加额外的自定义方法,如下所示,这样我就可以流畅地链接,并确保所有方法引用相同的 Dataframe 。
from pyspark.sql.dataframe import DataFrame
class Myclass(DataFrame):
def __init__(self,df):
super().__init__(df._jdf, df.sql_ctx)
def add_column3(self):
// Add column1 to dataframe received
self._jdf.withColumn("col3",lit(3))
return self
def add_column4(self):
// Add column to dataframe received
self._jdf.withColumn("col4",lit(4))
return self
if __name__ == "__main__":
'''
Spark Context initialization code
col1 col2
a 1
b 2
'''
df = spark.createDataFrame([("a",1), ("b",2)], ["col1","col2"])
myobj = MyClass(df)
## Trying to accomplish below where i can chain MyClass methods & Dataframe methods
myobj.add_column3().add_column4().drop_columns(["col1"])
'''
Expected Output
col2, col3,col4
1,3,4
2,3,4
'''
5条答案
按热度按时间xwbd5t1u1#
实际上,您不需要继承DataFrame类来向DataFrame对象添加一些自定义方法。
在Python中,你可以添加一个自定义属性来 Package 你的方法,如下所示:
tyg4sfes2#
blackbishop给出的答案值得一看,即使在撰写本文时还没有人支持它。这似乎是扩展Spark DataFrame类的一个很好的通用方法,我认为还有其他复杂对象。我将它稍微改写为:
kx7yvsdv3#
下面是我的解决方案(基于您的代码)。我不知道这是否是最佳实践,但至少正确地做到了您想要的。 Dataframe 是不可变的对象,所以在添加一个新列后,我们创建一个新对象,但不是
Dataframe
对象,而是Myclass
对象,因为我们希望拥有 Dataframe 和自定义方法。ddarikpa4#
我想你是在寻找这样的东西:
在这个例子中,有一个 Dataframe 被传递给构造函数方法,它被类中定义的后续方法使用。每当相应的方法被调用时, Dataframe 的状态被存储在示例化对象中。
oaxa6hgo5#
注意:Pyspark在即将到来的版本中不支持df.sql_ctx,因此这个答案不适用于未来。
我喜欢许多其他的答案,但评论中有几个挥之不去的问题。我认为可以这样解决:
self._jdf
--相反,只要将self当作DataFrame使用即可(因为它是DataFrame--这就是我们使用继承的原因!)self.foo
返回的值将是DataFrame基类型我们可以为数据到达的概念阶段创建一个新类,还可以添加辅助标志来帮助我们识别 Dataframe 中数据的状态,这里我在调用add column方法时添加一个标志,并推进所有现有的标志,你可以做任何你喜欢做的事情。
此设置意味着您可以创建一系列DataFrameExtender对象,例如:
RawData
,实现.clean()
方法,返回CleanedData
CleanedData
,实现.normalize()
方法,返回ModelReadyData
ModelReadyData
,它实现.train(model)
和.predict(model)
,或者.summarize()
,它在模型中用作基本DataFrame对象。通过将这些方法拆分成不同的类,这意味着我们不能在
RawData
上调用.train()
,但是我们可以获取RawData
对象并将.clean().normalize().train()
链接在一起。这是一种类似函数的方法,但是使用不可变对象来帮助解释。请注意,Spark中的DataFrame是延迟求值的,这对于这种方法来说非常好。所有这些代码只会生成一个最终的“未求值”DataFrame对象,该对象包含了“将要"执行的所有操作。我们不必担心内存或副本等问题。