如何减少使用aws胶水将Parquet文件写入s3所需的时间

pod7payv  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(478)

我正在创建一个胶水作业,需要处理来自s3路径的4tb的每日数据量- s3://<path>/<year>/<month>/<day>/<hour>/ . 因此,我创建了一个循环,通过每小时一次的文件夹(每个155gb)将数据读入spark df,对某些类别进行过滤,并将其作为按过滤类别划分的Parquet文件写回s3( s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/ ). 我使用60个g2.x工作节点,每个节点都有8个vcpu、32 gb内存、128 gb磁盘。s3写操作非常慢,需要10个多小时才能完成运行。除了增加节点数量之外,还有没有其他方法可以加速/优化s3写操作?

def s3_load_job(input_list):

    hour, year, month, day = input_list
    logger.info(f"hour in s3 func {hour}")

    # get data from s3
    s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
    logger.info(f"print s3 path {s3_path}")

    #user defined library function that return spark df
    df = get_df_from_s3(glueContext, s3_path)

    df = df.withColumn('category', F.lower(F.col('category')))

    df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))

    #write to s3
    datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")

    glueContext.write_dynamic_frame.from_options(frame = datasink4,
                                                             connection_type = "s3",
                                                             connection_options = 
                                                             {"path":"s3://<path>/"
                                           ,"partitionKeys"["category","year","month","day","hour"]}
                                                             ,format = "glueparquet" )

def main():

    year = '2020'
    month = '08'
    day = '01'
    hours = ["%.2d" % i for i in range(24)]

    input_list = [[hour, year, month, day] for hour in hours]
    logger.info(f"input_list {input_list}")

    for i in input_list:
        s3_load_job(i)

    job.commit()

if __name__ == "__main__":
    main()
ltqd579y

ltqd579y1#

如果您使用的是s3(对象存储),请尝试设置以下配置:

spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored -> true
mapreduce.fileoutputcommitter.algorithm.version -> 2
4ktjp1zp

4ktjp1zp2#

您可以尝试以下方法
不要将pyspark df转换为dynamicframe,因为可以直接将pysparkDataframe保存到s3。
因为你得到的文件大小为1mb到15mb,你需要做优化。因此,在将Dataframe写入s3之前,请尝试重新划分Dataframe。
如果分区大小为250 gb,则应创建大小至少为256 mb的输出文件,如果是g2.x,则还可以创建每个大小为512 mb的文件。
要做到这一点,你可以做到
您可以在每个分区中生成500个文件 500*512 = 250 GB ```
df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)

相关问题