我有一个至少有两个表的PostgreSQL数据库。
我想测试超出最大连接数的情况。但是,由于某种原因,我无法达到预期的结果,我所有的运行都是成功的。我在试着理解为什么。
当我通过运行'show max_connections'
检查数据库中的连接数时,我得到的结果是100
。
现在,我尝试通过将'numOfPartitions'设置为200来运行Spark JDBC,例如200个连接。我希望收到一个例外。
val df = ScalaSession.reader.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "test")
.option("dbtable", s"${param}")
.option("password", "test")
.option("fetchsize", "1000")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound", "1000")
.option("numPartitions", "200")
.option("driver", "org.postgresql.Driver")
.load()
df.count()
场景二:
我试图通过创建2个线程来并行运行上面的代码,每个线程在不同的表上执行count查询。我的意图是运行第一个查询并利用所有连接,然后执行第二个线程来访问数据库。我希望遇到一个'连接限制超出'异常,但令我惊讶的是,什么也没有发生。在Spark UI中,我仍然可以观察到这两个作业是并行运行的
class MyThread(param: String) extends Runnable {
override def run() {
val df = ScalaSession.reader.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "test")
.option("dbtable", s"${param}")
.option("password", "test")
.option("fetchsize", "1000")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound", "1000")
.option("numPartitions", "200")
.option("driver", "org.postgresql.Driver")
.load()
df.count()
}
}
class MyThread2(param: String) extends Runnable {
override def run() {
val df = ScalaSession.reader.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "test")
.option("dbtable", s"${param}")
.option("password", "test")
.option("fetchsize", "1000")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound", "1000")
.option("numPartitions", "200")
.option("driver", "org.postgresql.Driver")
.load()
df.count()
}
}
// Creating object
object MainObject {
// Main method
def main(args: Array[String]) {
val th1 = new Thread(new MyThread("atable"))
th1.setName("1")
th1.start()
print("Go to sleep")
Thread.sleep(5000)
print("I am woke up")
val th2 = new Thread(new MyThread2("btable"))
th2.setName("2")
th2.start()
}
}
object ScalaSession {
val reader = SparkSession
.builder()
.master("local[2]")
.appName("spark test")
.config("spark.sql.legacy.sizeOfNull", false)
.getOrCreate()
}
如何重现“连接超出”异常?“
1条答案
按热度按时间dgiusagp1#
连接是短暂的,一旦spark执行器完成对数据库的请求,它就会自动关闭。
尝试添加一个更复杂的过程,以增加连接时间。