在pyspark中是否可以对python对象使用parallelize函数?我想在对象列表上并行运行,使用函数修改它们,然后打印这些对象。
def init_spark(appname):
spark = SparkSession.builder.appName(appname).getOrCreate()
sc = spark.sparkContext
return spark,sc
def run_on_configs_spark(object_list):
spark,sc = init_spark(appname="analysis")
p_configs_RDD = sc.parallelize(object_list)
p_configs_RDD=p_configs_RDD.map(func)
p_configs_RDD.foreach(print)
def func(object):
return do-somthing(object)
当我运行上面的代码时,我遇到了一个错误“attributeerror:cannot get attribute'object'on<module'pyspark.daemon'from…>”。我该怎么解决?
我做了以下的变通方法。但我不认为这是一个好的解决方案,一般来说,它假设我可以改变对象的构造函数。
我已将对象转换为字典,并从目录中解释对象。
def init_spark(appname):
spark = SparkSession.builder.appName(appname).getOrCreate()
sc = spark.sparkContext
return spark,sc
def run_on_configs_spark(object_list):
spark,sc = init_spark(appname="analysis")
p_configs_RDD = sc.parallelize([x.__dict__() for x in object_list])
p_configs_RDD=p_configs_RDD.map(func)
p_configs_RDD.foreach(print)
def func(dict):
object=CreateObject(create_from_dict=True,dictionary=dict)
return do-something(object)
在对象的构造函数中:
class Object:
def __init__(create_from_dict=False,dictionary=None, other_params...):
if(create_from_dict):
self.__dict__.update(dictionary)
return
有没有更好的解决办法?
1条答案
按热度按时间n3schb8v1#
为了得到更好的答案,我建议你发布一个对象列表的示例和你想要的输出,这样我们就可以用真实的代码进行测试了。
根据pyspark docs(如上所述)parallelize函数应该接受任何集合,所以我认为问题可能是对象列表。我看到解决方法可以工作,因为输入类型是字典(或其他Map对象)的列表
至于一种模块化的方法来运行一般创建的对象,这取决于您希望rdd是如何的,但是一般的方法应该是将您想要的整个对象转换成一个集合类型的对象。一个不修改构造函数/结构的解决方案可以是
sc.parallelize([对象列表])
关键是要确保输入是集合类型。