spark:通过缓存高效地循环分区和etl

wqsoz72f  于 2021-05-18  发布在  Spark
关注(0)|答案(0)|浏览(328)

s3上有一个按日期划分的大型Parquet表,我需要在每个分区上执行几个批处理etl。缓存原始表比较理想,因为etl将重复地位于同一数据上。
解决方案a:

for (dateStr <- dates) {
  val df = spark.read.parquet("s3://blah/"+dateStr)
  val df_trimmed = df.filter($"colA" === "cat")  // this reduces the amount of data to cache
  df_trimmed.cache()

  df_trimmed
    .filter(...)
    .withColumn(...)
    ... // some other ETL1
    .write
    .parquet(...+dateStr)  // file 1

  df_trimmed
    .filter(...)
    .withColumn(...)
    ... // some other ETL2
    .write
    .parquet(...+dateStr) // file 2

  df_trimmed
    .filter(...)
    .withColumn(...)
    ... // some other ETL3
    .write
    .parquet(...+dateStr)  // file 3

  df_trimmed.unpersist()  // does this help?

}

解决方案b,加载整个表。数据元素没有放置一个冗余的数据列,而这些信息必须被重建。。。

var df = spark
  .read
  .parquet(dataPaths(0))
  .filter($"colA" === "cat")
  .withColumn("date", lit(dates(0)))

for (idx <- 1 until dataPaths.size) {
  // this apparently causes memory issue
  df = df.union(
         spark
           .read
           .parquet(dataPaths(idx))
           .filter($"colA" === "cat")
           .withColumn("date", lit(dates(idx))
       )  
}

... ETLs, then write partitioned by date.

一种解决方案是可行的,但是它相当慢,在经过大约10个分区后挂起。解决方案b只适用于少量分区,然后内存错误弹出,如本文所述https://medium.com/@david.mudrauskas/looping-过Spark反模式e10ac54824a0
那么,在一个大表中循环并只重复缓存其中的一部分/分区的最佳实践是什么呢?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题