使用可运行java池在多线程中运行spark作业

whhtz7ly  于 2021-05-22  发布在  Spark
关注(0)|答案(0)|浏览(315)

下面是在线程中运行spark代码的代码我们可以并行运行spark任务下面的代码是使用线程运行的示例
sparkDataframe传递查询运行将来将有助于并行运行
我们可以在多线程定义的线程池中运行spark,在循环中我们可以在并行模式下运行spark程序

//import statements
import java.util.concurrent.Executors

import org.apache.spark.sql.SparkSession

import scala.io.Source.fromFile
//Defined class
object ParallerlExecution {

//main method start
  def main(args: Array[String]): Unit = {
//lad file
val queryList=loadFile()
start multi threading 
parallerlExecution(queryList)
  }
//method to load file
  def loadFile():List[String]={
    fromFile("").getLines().toList
  }
//defined method to multithreading
  def parallerlExecution(queryList:List[String]): Unit ={
//created spark session
    val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate()
    val url=""
    val username=""
    val password=""
//defined thread pool
    val pool= Executors.newFixedThreadPool(3)
//start for loop
    for(query<-queryList){

      val r= new Runnable {
        override def run(): Unit = {

          val st = System.currentTimeMillis();
//load sql db
          val df = spark.read
            .format("jdbc")
            .option("url", "jdbc:postgresql:dbserver")
            .option("dbtable", query)
            .option("user", username)
            .option("password", password)
            .load()
          val count = df.count
          val et = System.currentTimeMillis();
//printing final count
          println("Thread" + Thread.currentThread().getId() + "  Record Count " + count + " StartTime " + st + " Endtime " + et + " Query: " + query)

        }
      }
//start execute
      pool.execute(r)

    enter code here
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题