即使使用unpersist,spark内存缓存也会不断增加

vybvopom  于 2021-07-14  发布在  Spark
关注(0)|答案(1)|浏览(457)

我正在遍历3个大文件并执行一系列统计计算。
我每个执行器有55gb的可用内存,8v内核,除了1个内核和1个主节点之外,还有多达10个任务节点可用。
以下是我实际代码的伪代码:


# Load MyConfigMeta file- this is a small file and will be a couple of times in the code

MyConfigMeta=spark.read.parquet("s3://path/MyConfigMeta.parquet")
MyConfigMeta=MyConfigMeta.persist(StorageLevel.MEMORY_AND_DISK)

# Very Large timeseries files

modules=["s3://path/file1.parquet",
         "s3://path/file2.parquet",
         "s3://path/file3.parquet"]

for file in modules:
    out_filename=1
    df1=spark.read.parquet(file)
    df1=df1.join(MyConfigMeta, on=["key"], how="inner")

    #Find out latest column values based on Timestamp
    lim_max=df1.groupBy('key')\
    .agg(f.max('TIME_STAMP').alias('TIME_STAMP'))
    temp=df1.select('TIME_STAMP','key',''UL','LL')
    lim_max=lim_max.join(temp, on=['TIME_STAMP','key'], how="left")\
    .drop('TIME_STAMP')\
    .distinct()
    lim_max=lim_max.persist(StorageLevel.MEMORY_AND_DISK)

    df1=df1.drop('UL,'LL')\
    .join(lim_max, on=['key'], how="left")\
    withColumn('out_clip', when(col('RESULT').between(col('LL'),col('UL')), 0).otherwise(1))\

    df1=df1.persist(StorageLevel.MEMORY_AND_DISK) # This is a very large dataframe and will later be used for simulation

    df2=df1.filter(col('out_clip')==0)\
    .groupBy('key')\
    .agg(f.round(expr('percentile(RESULT, 0.9999)'),4).alias('UPPER_PERCENTILE'),
         f.round(expr('percentile(RESULT, 0.0001)'),4).alias('LOWER_PERCENTILE'))\
    .withColumn('pcnt_clip', when(col('RESULT').between(col('LOWER_PERCENTILE'),col('UPPER_PERCENTILE')), 0).otherwise(1))\
    .filter(col('pcnt_clip')==0)

    stats=df2.groupBy('key')\
    .agg(#Perform a bunch of statistical calculations (mean, avg, kurtosis, skew))
    stats=stats.join(lim_max, on=['key'], how="left") #get back the columns from lim_max

    lim_max=lim_max.unpersist()

    stats=stats.withColumn('New_UL', #formula to calculate new limits)\
    .withColumn('New_LL', #formula to calculate new limits)\
    .join(MyConfigMeta, on=['key'], how="left")

    #Simulate data
    df_sim=df1.join(stats, on=['key'], how="inner")\
    .withColumn('newOOC', when ((col('RESULT')<col('New_LL')) | (col('RESULT')>col('New_UL')), 1).otherwise(0))

    df3=df_sim.groupBy('key')\
    .agg(f.sum('newOOC').alias('simulated result'))

    #Join back with stats to get statistcal data, context data along with simulated data
    df4=df3.join(stats, on=['key'], how="inner")

    #Write output file
    df4.write.mode('overwrite').parquet("s3://path/sim_" +out_filename+ ".parquet")

    df1=df1.unpersist()
    spark.catalog.clearCache()

我的配置是6 executor-cores 以及 driver-cores ,41 GB executor-memory ,41 GB driver-memory ,14 GB spark.executor.memoryOverhead 以及 9 num executors。 当我查看ganglia中的内存图表时,我注意到第一个文件完成得很好,但是后续文件的计算失败了,因为它不断遇到丢失节点的问题 executorlostfailure(executor 5退出,与正在运行的任务无关)原因:容器标记为失败。诊断:丢失节点上释放的容器。 ![](https://i.stack.imgur.com/l1p37.png) 自从我取消持久化后,我本以为高速缓存会明显清除df1Dataframe和使用spark.catalog.clearCache()` . 但记忆似乎在不断增加而没有被清除。但是,如果我运行单独的文件,它似乎工作良好。

在这里,很大一部分内存被清除,只是因为10名执行者死亡并被列入黑名单。
有没有办法让记忆在Spark中冲走?还是我不断丢失节点的另一个原因?

fbcarpbf

fbcarpbf1#

可以使用以下函数刷新sparkcontext中的所有持久化数据集。它列出rdd并调用unpersist方法。在函数内部创建df时,它特别有用。

def unpersist_dataframes() -> None:
  for (id, rdd) in sc._jsc.getPersistentRDDs().items():
      rdd.unpersist()
      print("Unpersisted {} rdd".format(id))

为了监视持久化的Dataframe,请检查sparkui中的storage选项卡。不要担心ganglia stats中的空闲内存,实际上这可能是您的资源没有得到充分利用的迹象。spark明智地管理内存。
关于丢失的节点,如果您使用的是像databricks这样的托管服务,它将在集群的事件日志中显示终止节点的原因。

相关问题