如何利用pyspark的并行化操作将pysparkDataframe的每一行存储到一个自定义的python对象中?

cwtwac6a  于 2021-05-27  发布在  Spark
关注(0)|答案(0)|浏览(226)
'''
Part 1:
I have this spark dataframe, there suppose to be multiple feature
columns, but for the sake of simplicity I only have 1 here
'''
sample_list = [("1", "feature_1", "yes"), 
           ("2", "feature_2", "no"), 
           ("3", "feature_3", "yes"), 
           ("4", "feature_4", "no")]

 df = spark.createDataFrame(sample_list, ["id", "features", "label"])

 '''
 Part 2:
 I also have a self-defined python node object
 '''
 class Node(object):
    def __init__(self, node_id, feature_vector, label):
    '''
    self.node_id: string
        Unique id of a node.

    self.feature_vector(X): list
        A list of feature.

    self.label(Y): string
        Corresponding Y label.
    '''
        self.node_id = node_id 
        self.feature_vector = feature_vector
        self.label = label

 ''' 
 Part 3:
 I did the following to store each row of the df into it's own 
 node object
 '''

 for row in df.rdd.collect():
     node_object = Node(row[0], list(row[1]), row[2])
     # next steps are add each node_object into a graph object

但是这个df.rdd.collect()在处理say时是可以的
一个小的Dataframe(千行),但是当我有数百万行Dataframe时,它不再是一个好的实践。
因此,我想知道是否有人可以向我展示一个如何实现上述功能的例子,但使用spark的方式,例如利用spark的并行化操作(或者上述功能在使用pyspark时是否可行)。我是一个新的Spark,如果有人能给我一个明确的例子,最好使用上面的代码,将是伟大的

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题