pyspark 合并小文件时AWS粘附作业失败

mbzjlibv  于 2023-01-04  发布在  Spark
关注(0)|答案(1)|浏览(168)

我是Glue和PySpark的新手。我正在尝试创建一个作业,它可以抓取大量的小文件,并将它们合并成更大的文件。我知道使用下推 predicate 或分区键不会有问题。但是,此数据中唯一可用的分区键是日期列。但是,文件已经根据该日期分隔。某些日期(及其后续文件)非常小(通常为15kb-5MB)。并且文件被组织到其后续表的S3中的目录中。其中一些目录很大(40GB+),由数千个这样的小文件组成。文件格式是Parquet的。作为要求,所有列都需要转换为字符串(不要担心为什么。只要知道这是一个硬性和快速的要求)。一些S3顶级键(表目录)处理得很好。但是,只有较小的键。较大的键每次都出错。我猜是因为执行器内存不足。主要是因为较大的顶级键确实设法输出了一些合并文件。但是,不管处理的文件大小如何,只有13.8GB。我正在G.1x上运行此作业(16GB的内存),只能假设这是因为它用完了内存,虽然我没有看到具体的OOM错误的Cloudwatch日志。G.2x的工作处理更多的文件,但最终错误了。进一步我认为这是一个内存问题。我已经尝试了G1。10到150个DPU的x作业。结果相同。
作业脚本是用python编写的,我花了很长时间来节省内存。我试着把所有文件读入一个pyspark Dataframe ,并使用我需要的模式,重新分区以包含大约128MB的分区(我想要的最终文件大小),把 Dataframe 读入一个DynamicFrame,然后把结果写入Glue目录并写出文件。我觉得这是显而易见的原因。
我的最新迭代从顶层S3目录读取文件对象列表,获取它们的大小,并创建一个列表列表。内部列表包含S3对象的文件键,其大小总计约为1.28GB。然后,我们将其读入一个pyspark帧,将此帧重新分区为10个分区(对于10~128MB的文件),读入动态帧,然后写出这些分区。我已经在传统函数中使用了这种分块方法,但失败了。然后我创建了生成器,它将创建pyspark帧和动态帧。无论出于什么原因,python不会释放内存,即使涉及到生成器。这导致我甚至尝试使用ol '"在单独的进程中运行函数"技巧。但是,失败了,因为Glue工人是不可picklable的。我的目标是能够运行这一个作业来迭代所有顶级S3键。但是,正如您在示例代码中所看到的,现在我只想试试大一点的。
我几乎可以肯定,我只是如此绿色胶水,我甚至没有接近这个胶水希望我的方式。我一直在搜索SO,YouTube,和随机互联网指南(包括AWS关于Glue的文档),但没有任何帮助。我完全意识到这段代码包含了许多不太理想的技术。在这一点上,我只是试图取得一些进展,所以请尽量把你对这个问题的反馈放在手边。这个问题是:* * 如果正在处理的数据的总大小大于粘合作业中的内存量,我如何在粘合作业中将许多小文件合并为一系列固定大小的较大文件?**下面是我正在尝试的代码和我得到的错误。其他详细信息可根据要求提供。非常感谢您可以提供的任何帮助!

    • 代码**
import sys
import math
import boto3
from concurrent.futures import ProcessPoolExecutor
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
from pyspark.sql.types import StringType, StructType, StructField

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
s3_resource = boto3.resource('s3')
client = boto3.client('s3')

input_bucket = "test-datastore"
output_bucket = "test-unified-datastore"
num_of_partitions = 10

def get_s3_dirs(input_bucket):
    paginator = client.get_paginator('list_objects')
    result = paginator.paginate(Bucket=input_bucket, Delimiter='/')
    dirs = [prefix.get('Prefix').strip('/') for prefix in result.search('CommonPrefixes')]
    return dirs

def get_file_meta(input_bucket, input_key):
    bucket_obj = s3_resource.Bucket(input_bucket)
    for obj in bucket_obj.objects.filter(Prefix=input_key):
        file_meta = {'path': f's3://{input_bucket}/{obj.key}', 'size': obj.size}
        yield file_meta

def get_chunk_size():
    return math.ceil(num_of_partitions * 128000000)

def create_chunks(input_key):
    sized_chunks = []
    target_chunk_size = get_chunk_size()
    chunks_sized = False
    file_meta = get_file_meta(input_bucket, input_key)
    while not chunks_sized:
        current_chunk = []
        current_chunk_size = 0
        while current_chunk_size < target_chunk_size:
            try:
                meta = next(file_meta)
                file_path = meta['path']
                file_size = meta['size']
                current_chunk_size += file_size
                current_chunk.append(file_path)
            except StopIteration:
                if current_chunk:
                    # if the current chunk is not the only chunk and its smaller than
                    # 100MB, merge it with the last chunk in sized_chunks
                    if sized_chunks and current_chunk_size < (1048576 * 100):
                        current_chunk.extend(sized_chunks.pop(-1))
                chunks_sized = True
                break
        sized_chunks.append(current_chunk)
    return sized_chunks

def gen_create_spark_df(schema, chunk, num_of_partitions):
    spark_df = spark.read.schema(schema).parquet(*chunk).repartition(num_of_partitions)
    yield spark_df

def gen_process_chunks(chunks):
    for chunk in chunks:
        sample_file = chunk[0]
        sample_df = spark.read.parquet(sample_file)
        schema = sample_df.schema
        struct_fields = []
        for field in schema.fields:
            schema_field = StructField(
                field.name,
                StringType(),
                field.nullable
            )
            struct_fields.append(schema_field)
        schema = StructType(struct_fields)
        output_dynamic = DynamicFrame.fromDF(
            next(gen_create_spark_df(schema, chunk, num_of_partitions)), 
            glueContext, "amazonS3_consolidator_3"
        )
        yield output_dynamic

def merge_table(input_key):
    chunks = create_chunks(input_key)
    dfs = gen_process_chunks(chunks)
    for df in dfs:
        AmazonS3_node1670429555763 = glueContext.getSink(
            path=f"s3://{output_bucket}/{input_key}/",
            connection_type="s3",
            updateBehavior="UPDATE_IN_DATABASE",
            partitionKeys=[],
            enableUpdateCatalog=True,
            transformation_ctx="AmazonS3_node1670429555763",
        )
        AmazonS3_node1670429555763.setCatalogInfo(
            catalogDatabase="test_db", catalogTableName=input_key.lower()
        )
        AmazonS3_node1670429555763.setFormat("glueparquet")
        AmazonS3_node1670429555763.writeFrame(df)
        

s3_dirs = get_s3_dirs(input_bucket)[7:8]
for input_key in s3_dirs:
    merge_table(input_key)
    • 错误**
Py4JJavaError: An error occurred while calling o200.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 24 in stage 26.0 failed 4 times, most recent failure: Lost task 24.3 in stage 26.0 (TID 1928, 172.35.39.79, executor 19): ExecutorLostFailure (executor 19 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at com.amazonaws.services.glue.sinks.GlueParquetHadoopWriter.doParquetWrite(GlueParquetHadoopWriter.scala:178)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1$$anonfun$4.apply$mcV$sp(HadoopDataSink.scala:243)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1$$anonfun$4.apply(HadoopDataSink.scala:235)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1$$anonfun$4.apply(HadoopDataSink.scala:235)
    at scala.util.Try$.apply(Try.scala:192)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:235)
    at com.amazonaws.services.glue.sinks.HadoopDataSink$$anonfun$writeDynamicFrame$1.apply(HadoopDataSink.scala:149)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89)
    at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:82)
    at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:89)
    at com.amazonaws.services.glue.sinks.HadoopDataSink.writeDynamicFrame(HadoopDataSink.scala:148)
    at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:65)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)

我试过将整个顶级密钥读入动态帧,将顶级密钥中的文件读入块并将每个块作为其自己的动态帧处理,将文件分块到由生成器函数创建的动态帧中,并将文件分块到一个动态框架中,该框架是在一个单独的python Process中创建的。我遵循了AWS和第三方的博客和YouTube视频格式的AWS Glue指南。
我遵循了AWS的以下指南:https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/
并回顾了该SO问题:AWS Glue job running out of memory
并且没有找到相关的帮助。
我期望粘合作业将大量的小文件合并成更大文件的一个更小子集。
无论使用何种技术,我都收到一个错误,告诉我RPC客户机由于容器超过阈值或网络问题而断开关联。

evrscar2

evrscar21#

创建一个新的胶水爬取器,用于读取小的Parquet文件并创建相应的胶水数据目录表。
创建一个Glue ETL(提取、转换和加载)作业,该作业从小的Parquet文件读取数据,执行任何必要的转换,并将数据写入S3中不同位置的较大Parquet文件。设置一个Glue触发器,以便按计划或响应某些事件运行ETL作业。
下面是一些Python示例代码,演示了如何使用Glue API创建和运行Glue ETL作业,将小的Parquet文件合并为更大的文件:

import boto3

# Create a Glue client
glue_client = boto3.client('glue')

# Set the name of the Glue ETL job and the Glue Data Catalog table that it reads from
job_name = 'my-etl-job'
table_name = 'my-table'

# Set the name of the output S3 bucket and prefix where the large Parquet files will be written
output_bucket = 'my-output-bucket'
output_prefix = 'output/'

# Define the Glue ETL job properties
job_properties = {
    'etlJobName': job_name,
    'inputPath': f's3://{table_name}',
    'outputPath': f's3://{output_bucket}/{output_prefix}',
    'maxConcurrentRuns': 1,
    'command': {
        'name': 'pythonshell',
        'pythonVersion': '3',
        'scriptLocation': 's3://my-scripts/merge_parquet.py'
    }
}

# Create the Glue ETL job
glue_client.create_job(**job_properties)

# Run the Glue ETL job
glue_client.start_job_run(JobName=job_name)

在scriptLocation字段中指定的merge_parquet. py脚本是一个Python脚本,它读取小的Parquet文件,执行任何必要的转换,并将数据写入较大的Parquet文件。您需要自己实现此脚本,使用panda库读取和写入Parquet文件,使用pyarrow库执行文件合并操作。
一些例子:

import pyarrow.parquet as pq

# Set the input and output filenames
input_filenames = ['file1.parquet', 'file2.parquet', 'file3.parquet']
output_filename = 'merged.parquet'

# Merge the input files
pq.merge_files(input_filenames, output_filename)

import pyarrow.parquet as pq

# Set the input and output S3 paths
input_path = 's3://my-bucket/input/'
output_path = 's3://my-bucket/output/merged.parquet'

# Read the input files as a dataset
dataset = pq.ParquetDataset(input_path)

# Merge the input files
pq.merge_files(dataset.paths, output_path)

或者你可以简单地创建一个Spark工作在雅典娜笔记本,或EMR有一个更好的内存/CPU控制和使用下面的代码

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Set the input and output paths
input_path = 's3://my-bucket/input/'
output_path = 's3://my-bucket/output/'

# Read the small Parquet files and merge them into a single DataFrame
df = spark.read.option("mergeSchema", "true").parquet(input_path)

# Calculate the number of 128MB partitions needed
num_partitions = df.count() * df.schema.jsonValue()['sizeInBytes'] // (128 * 1024 * 1024)

# Repartition the DataFrame into smaller DataFrames with a target size of 128MB
df_repartitioned = df.repartition(num_partitions)

# Write the repartitioned DataFrames to output files
df_repartitioned.write.mode('overwrite').parquet(output_path)

相关问题