s3上有一个按日期划分的大型Parquet表,我需要在每个分区上执行几个批处理etl。缓存原始表比较理想,因为etl将重复地位于同一数据上。
解决方案a:
for (dateStr <- dates) {
val df = spark.read.parquet("s3://blah/"+dateStr)
val df_trimmed = df.filter($"colA" === "cat") // this reduces the amount of data to cache
df_trimmed.cache()
df_trimmed
.filter(...)
.withColumn(...)
... // some other ETL1
.write
.parquet(...+dateStr) // file 1
df_trimmed
.filter(...)
.withColumn(...)
... // some other ETL2
.write
.parquet(...+dateStr) // file 2
df_trimmed
.filter(...)
.withColumn(...)
... // some other ETL3
.write
.parquet(...+dateStr) // file 3
df_trimmed.unpersist() // does this help?
}
解决方案b,加载整个表。数据元素没有放置一个冗余的数据列,而这些信息必须被重建。。。
var df = spark
.read
.parquet(dataPaths(0))
.filter($"colA" === "cat")
.withColumn("date", lit(dates(0)))
for (idx <- 1 until dataPaths.size) {
// this apparently causes memory issue
df = df.union(
spark
.read
.parquet(dataPaths(idx))
.filter($"colA" === "cat")
.withColumn("date", lit(dates(idx))
)
}
... ETLs, then write partitioned by date.
一种解决方案是可行的,但是它相当慢,在经过大约10个分区后挂起。解决方案b只适用于少量分区,然后内存错误弹出,如本文所述https://medium.com/@david.mudrauskas/looping-过Spark反模式e10ac54824a0
那么,在一个大表中循环并只重复缓存其中的一部分/分区的最佳实践是什么呢?
暂无答案!
目前还没有任何答案,快来回答吧!