picklingerror

byqmnocz  于 2021-07-09  发布在  Spark
关注(0)|答案(0)|浏览(157)

我试图将一个函数(如果满足某个条件,它会更新dynamodb表中的记录)Map到pyspark中的一个大Dataframe。我知道函数会被pickle并发送给执行者,但是我已经阅读了无数的例子,其中的解决方法是将map函数插入全局范围。不幸的是,这对我不起作用。

def update_dynamodb(rows, dynamodb_tb_name, s3_bucket_name, region):
    dynamodb_table = boto3.resource('dynamodb', region_name = region).Table(dynamodb_tb_name)
    s3_bucket = boto3.resource('s3', region_name = region).Bucket(s3_bucket_name)
    for row in rows:
         # code that modifies Dynamodb is here....

dynamodb_write_df = df.repartition(num_executors * 2)
dynamodb_write_df.rdd.foreachPartition(lambda x: update_dynamodb(x, dynamodb_tb_name, raw_s3_bucket, region))

此代码产生错误:

_pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o81.__getstate__. Trace:
py4j.Py4JException: Method __getstate__([]) does not exist

在这条线上:

dynamodb_write_df.rdd.foreachPartition(lambda x: update_dynamodb(x, eviv_dynamodb_tb, raw_s3_bucket, region))

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题