我试图将一个函数(如果满足某个条件,它会更新dynamodb表中的记录)Map到pyspark中的一个大Dataframe。我知道函数会被pickle并发送给执行者,但是我已经阅读了无数的例子,其中的解决方法是将map函数插入全局范围。不幸的是,这对我不起作用。
def update_dynamodb(rows, dynamodb_tb_name, s3_bucket_name, region):
dynamodb_table = boto3.resource('dynamodb', region_name = region).Table(dynamodb_tb_name)
s3_bucket = boto3.resource('s3', region_name = region).Bucket(s3_bucket_name)
for row in rows:
# code that modifies Dynamodb is here....
dynamodb_write_df = df.repartition(num_executors * 2)
dynamodb_write_df.rdd.foreachPartition(lambda x: update_dynamodb(x, dynamodb_tb_name, raw_s3_bucket, region))
此代码产生错误:
_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o81.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist
在这条线上:
dynamodb_write_df.rdd.foreachPartition(lambda x: update_dynamodb(x, eviv_dynamodb_tb, raw_s3_bucket, region))
暂无答案!
目前还没有任何答案,快来回答吧!