“如何才能超过PostgreSQL和Spark JDBC中的最大连接数

bprjcwpo  于 2023-05-23  发布在  Apache
关注(0)|答案(1)|浏览(182)

我有一个至少有两个表的PostgreSQL数据库。
我想测试超出最大连接数的情况。但是,由于某种原因,我无法达到预期的结果,我所有的运行都是成功的。我在试着理解为什么。
当我通过运行'show max_connections'检查数据库中的连接数时,我得到的结果是100
现在,我尝试通过将'numOfPartitions'设置为200来运行Spark JDBC,例如200个连接。我希望收到一个例外。

val df = ScalaSession.reader.read
.format("jdbc")
.option("url", "jdbc:postgresql://localhost:5432/test")
.option("user", "test")
.option("dbtable", s"${param}")
.option("password", "test")
.option("fetchsize", "1000")
.option("partitionColumn", "id")
.option("lowerBound", "0")
.option("upperBound", "1000")
.option("numPartitions", "200")
.option("driver", "org.postgresql.Driver")
.load()

df.count()

场景二:
我试图通过创建2个线程来并行运行上面的代码,每个线程在不同的表上执行count查询。我的意图是运行第一个查询并利用所有连接,然后执行第二个线程来访问数据库。我希望遇到一个'连接限制超出'异常,但令我惊讶的是,什么也没有发生。在Spark UI中,我仍然可以观察到这两个作业是并行运行的

class MyThread(param: String) extends Runnable {

  override def run() {
    val df = ScalaSession.reader.read
      .format("jdbc")
      .option("url", "jdbc:postgresql://localhost:5432/test")
      .option("user", "test")
      .option("dbtable", s"${param}")
      .option("password", "test")
      .option("fetchsize", "1000")
      .option("partitionColumn", "id")
      .option("lowerBound", "0")
      .option("upperBound", "1000")
      .option("numPartitions", "200")
      .option("driver", "org.postgresql.Driver")
      .load()

    df.count()
  }

}

class MyThread2(param: String) extends Runnable {

  override def run() {
    val df = ScalaSession.reader.read
      .format("jdbc")
      .option("url", "jdbc:postgresql://localhost:5432/test")
      .option("user", "test")
      .option("dbtable", s"${param}")
      .option("password", "test")
      .option("fetchsize", "1000")
      .option("partitionColumn", "id")
      .option("lowerBound", "0")
      .option("upperBound", "1000")
      .option("numPartitions", "200")
      .option("driver", "org.postgresql.Driver")
      .load()

    df.count()

  }
}

// Creating object
object MainObject {
  // Main method
  def main(args: Array[String]) {
    val th1 = new Thread(new MyThread("atable"))
    th1.setName("1")
    th1.start()

    print("Go to sleep")
    Thread.sleep(5000)
    print("I am woke up")

    val th2 = new Thread(new MyThread2("btable"))
    th2.setName("2")
    th2.start()
  }
}

object ScalaSession {
  val reader = SparkSession
    .builder()
    .master("local[2]")
    .appName("spark test")
    .config("spark.sql.legacy.sizeOfNull", false)
    .getOrCreate()
}

如何重现“连接超出”异常?“

dgiusagp

dgiusagp1#

连接是短暂的,一旦spark执行器完成对数据库的请求,它就会自动关闭。
尝试添加一个更复杂的过程,以增加连接时间。

相关问题