使用pyspark aws粘合剂时,器械上没有剩余空间错误

aurhwmvo  于 2022-12-11  发布在  Spark
关注(0)|答案(1)|浏览(97)

我正在使用AWS glue将DynamoDB项提取到S3中。我使用pyspark读取所有项,并对从DynamoDB检索到的项进行glue和转换,然后写入S3。但我总是遇到错误“设备上没有剩余空间”。
我使用的worker类型是G.1X,每个workerMap到1个DPU(4个vCPU、16 GB内存、64 GB磁盘),dynamoDB的大小是6 GB。
基于AWS documentation在随机播放过程中,数据被写入磁盘并通过网络传输。因此,随机播放操作将绑定到本地磁盘容量如何以编程方式设置随机播放?请在下面找到我的示例代码,

from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import Map
from awsglue.transforms import Filter
from pyspark import SparkConf

conf = SparkConf()
glue_context = GlueContext(SparkContext.getOrCreate())


# mytable got id and uri
resources_table_dynamic_frame = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.input.tableName": "my_table",
        "dynamodb.throughput.read.percent": "0.4",
        "dynamodb.splits": "8"
    }
)

# Filter out rows whose ids are same
def filter_new_id(dynamicRecord):
    uri = dynamicRecord['Uri']
    uri_split = uri.split(":")
    # Get the internal ID
    internal_id = uri_split[1]
    print(dynamicRecord)

    if internal_id == dynamicRecord['id']:
        return False

    return True

# Keep only the items whose IDs are different.
resource_with_old_id = Filter.apply(
    frame=resources_table_dynamic_frame,
    f=lambda x: filter_new_id(x),
    transformation_ctx='resource_with_old_id'
)

glue_context.write_dynamic_frame_from_options(
    frame=resource_with_old_id,
    connection_type="s3",
    connection_options={"path": "s3://path/"},
    format="json"
)
64jmpszr

64jmpszr1#

我通过在OP中发布的代码中进行以下调整来解决这个问题。

resources_table_dynamic_frame = glue_context.create_dynamic_frame.from_options(
   connection_type="dynamodb",
   connection_options={
       "dynamodb.input.tableName": "my_table",
       "dynamodb.throughput.read.percent": "0.5",
       "dynamodb.splits": "200"
   },
   additional_options={
      "boundedFiles" : "30000"
   }
)

我按照AWS文档here中的建议添加了boundedFiles,并增加了dynamodb.splits以使其为我工作。

相关问题