我在一个6节点的集群上运行一个相对简单的spark程序,迭代索引列表,每次迭代大约需要2-3分钟。
一个好主意似乎是使用固定的线程池计数和公平的调度来并行索引列表: spark.scheduler.mode=FAIR
.
代码为:
val df1 = spark.read.parquet("....")
val df2 = spark.read.parquet("....")
val parIndices = indices.par
parIndices.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(3))
parIndices.foreach(index => {
val df = df1
.where($"id".between(index, index + 100))
.joinWith(df2, condition, "inner")
.map {
case (left: Type1, right: Type2) => (
left.id,
right.id,
left.distance(right)
)
}.toDF()
df.write.mode("overwrite").parquet(s"$basePath/index=$index")
})
然而,现在的情况是,集群的内存资源似乎没有得到释放,导致内存的线性消耗,并最终在处理1小时后占用集群ram。
值得注意的是,我在emr5.29和scala2.11.12上使用了spark2.4.4。
你知道这是什么原因吗?或者如何调试?
暂无答案!
目前还没有任何答案,快来回答吧!