在我开始提问之前,先介绍一下背景知识:
我参与了一个使用自举spark应用程序的项目,因此,我们有很多专门用于sparksession初始化的逻辑。因此,对于单元测试,我们启动/关闭许多会话。
我们的ci构建最近确实特别慢/有缺陷,经过一些调查,我们发现由于失控的守护进程线程创建/持久性,我们正在达到资源限制。下面是一个简短的代码片段,再现了这个问题:
import java.io.File
import java.nio.file.Files
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.scalatest.FunSuite
class TestSessionCleanupTest extends FunSuite {
private def createAndStopSparkSession(): Unit = {
setUpDerby()
val session = SparkSession.builder()
.appName("session_cleanup_test")
.master("local")
.enableHiveSupport()
.getOrCreate()
session.sql("show databases")
session.stop()
}
private def setUpDerby(): Unit = {
val tempDir = Files.createTempDirectory("derby")
tempDir.toFile.deleteOnExit() // deleting eagerly after session.stop() makes no difference
System.setProperty("derby.system.home", tempDir.toString)
}
test("Start/stop a bunch of sessions") {
Range(1,50).foreach { idx =>
println(s"Starting session $idx")
createAndStopSparkSession()
println(s"Stopped session $idx")
}
}
}
使用JavaVisualVM,您可以看到线程计数和堆大小随着时间的推移而稳步增加:线程和堆状态
似乎没有得到清理/增长的特定守护程序线程包括:
BoneCP-keep-alive-scheduler BoneCP-pool-watch-thread
com.google.common.base.internal.Finalizer derby.rawStoreDaemon
(少了一些,但还是一堆)
当然,如果我把 spark.sql("show databases")
语句,则不会发生此行为。因此,这显然是jdbc连接池到嵌入式derby元存储的问题。
我在这个例子中使用了spark2.4.4,但是fwiw我们在spark3上也看到了这种行为。
name := "Test Session Clean Up"
scalaVersion := "2.11.12"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "2.4.4",
"org.apache.spark" %% "spark-sql" % "2.4.4",
"org.apache.spark" %% "spark-hive" % "2.4.4",
"org.scalatest" %% "scalatest" % "3.1.0" % Test
)
最后,我的问题是:
你知道为什么会这样吗?
有什么好办法吗?
有没有可能的创可贴/解决方法(而不是在所有单元测试之间共享单个会话/上下文)
如果可能的话,我们仍然希望使用嵌入式元存储,但是因为这只用于单元测试——而不是生产——我们可以接受一个黑客/不雅的解决方案(例如,我已经尝试过手动中断和/或终止相关线程,但没有运气)。
谢谢!
暂无答案!
目前还没有任何答案,快来回答吧!