如何检查数据是否缓存在MySQL中,或者由于Pyspark中的延迟执行而尚未缓存?

ss2ws0br  于 11个月前  发布在  Spark
关注(0)|答案(2)|浏览(114)

我的问题和其他关于堆栈溢出的问题没有什么不同,我需要知道数据是否已经被检索并存储在一个堆栈中,或者还没有发生
我正在做这样的事情

df1=spark.table("sourceDB.Table1")
df1.cache()

字符串
现在,你可能已经意识到,由于延迟执行,数据还没有从源表中读取。所以我需要一个表达式,在这里将结果表示为“False”。
过了一段时间,我正在做一些操作,需要从源代码中检索数据。

df1.groupBy("col3").agg(sum("col1").alias("sum_of_col1")).select("sum_of_col1","col3").filter("sum_of_col1 >= 100").show()


在这一点上,数据必须已经读取并存储在df1的缓存中。所以我需要在这里有一个表达式,在这一点上将结果表示为“True”。
有什么方法可以做到这一点吗?我相信df1.is_cached在这种情况下不会有帮助

ee7vknir

ee7vknir1#

也许这是有用的

1.如果您想检查是否已经在框架上触发了cache/persist,则可以使用cachemanager进行确认,如下所示-

spark.sharedState.cacheManager.lookupCachedData(df.queryExecution.logical).nonEmpty

字符串

2.如果你想检查数据是否在内存中,下面的方法可能会有帮助-

def checkIfDataIsInMemory(df: DataFrame): Boolean = {
      val manager = df.sparkSession.sharedState.cacheManager
      // step 1 - check if the dataframe.cache is issued earlier or not
      if (manager.lookupCachedData(df.queryExecution.logical).nonEmpty) {// cache statement was already issued
        println("Cache statement is already issued on this dataframe")
        // step-2 check if the data is in memory or not
        val cacheData = manager.lookupCachedData(df.queryExecution.logical).get
        cacheData.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
      } else false
    }

3.测试上述方法-

val df = spark.read
      .parquet(getClass.getResource("/parquet/plain/part-00000-4ece3595-e410-4301-aefd-431cd1debf91-c000.snappy" +
        ".parquet").getPath)
    println(checkIfDataIsInMemory(df))
    /**
      * false
      */
    
    df.cache()
    // check if the data is cached
    println(checkIfDataIsInMemory(df))
    /**
      * Cache statement is already issued on this dataframe
      * false
      */
    
    println(df.count())
    println(checkIfDataIsInMemory(df))

    /**
      * 1
      * Cache statement is already issued on this dataframe
      * true
      */

qyzbxkaa

qyzbxkaa2#

pyspark版本(基于answer from @som

def is_cached(df: DataFrame) -> bool:
    jspark: Any = df.sparkSession._jsparkSession
    jdf: Any = df._jdf
    plan = jdf.queryExecution().logical()
    cache = jspark.sharedState().cacheManager().lookupCachedData(plan)
    return (
        cache.nonEmpty() and
        cache.get().cachedRepresentation().cacheBuilder().isCachedColumnBuffersLoaded()
    )

df = spark.createDataFrame([Row(id=1)])
is_cached(df) # False
df.count()
is_cached(df) # False
df.cache()
is_cached(df) # False
df.count()
is_cached(df) # True

字符串

相关问题