我正在创建一个胶水作业,需要处理来自s3路径的4tb的每日数据量- s3://<path>/<year>/<month>/<day>/<hour>/
. 因此,我创建了一个循环,通过每小时一次的文件夹(每个155gb)将数据读入spark df,对某些类别进行过滤,并将其作为按过滤类别划分的Parquet文件写回s3( s3://<path>/category=<category>/year=<year>/month=<month>/day=<day>/hour=<hour>/
). 我使用60个g2.x工作节点,每个节点都有8个vcpu、32 gb内存、128 gb磁盘。s3写操作非常慢,需要10个多小时才能完成运行。除了增加节点数量之外,还有没有其他方法可以加速/优化s3写操作?
def s3_load_job(input_list):
hour, year, month, day = input_list
logger.info(f"hour in s3 func {hour}")
# get data from s3
s3_path = f"s3://<path>/{year}/{month}/{day}/{hour}/"
logger.info(f"print s3 path {s3_path}")
#user defined library function that return spark df
df = get_df_from_s3(glueContext, s3_path)
df = df.withColumn('category', F.lower(F.col('category')))
df_rep = df.where(F.col('category').isin({ "A", "B", "C","D"}))
#write to s3
datasink4 = DynamicFrame.fromDF(df_rep, glueContext, "datasink4")
glueContext.write_dynamic_frame.from_options(frame = datasink4,
connection_type = "s3",
connection_options =
{"path":"s3://<path>/"
,"partitionKeys"["category","year","month","day","hour"]}
,format = "glueparquet" )
def main():
year = '2020'
month = '08'
day = '01'
hours = ["%.2d" % i for i in range(24)]
input_list = [[hour, year, month, day] for hour in hours]
logger.info(f"input_list {input_list}")
for i in input_list:
s3_load_job(i)
job.commit()
if __name__ == "__main__":
main()
2条答案
按热度按时间ltqd579y1#
如果您使用的是s3(对象存储),请尝试设置以下配置:
4ktjp1zp2#
您可以尝试以下方法
不要将pyspark df转换为dynamicframe,因为可以直接将pysparkDataframe保存到s3。
因为你得到的文件大小为1mb到15mb,你需要做优化。因此,在将Dataframe写入s3之前,请尝试重新划分Dataframe。
如果分区大小为250 gb,则应创建大小至少为256 mb的输出文件,如果是g2.x,则还可以创建每个大小为512 mb的文件。
要做到这一点,你可以做到
您可以在每个分区中生成500个文件
500*512 = 250 GB
```df.repartition(500,partitionCol).write.partitionBy(partitionCol).parquet(path)