下面是在线程中运行spark代码的代码我们可以并行运行spark任务下面的代码是使用线程运行的示例
sparkDataframe传递查询运行将来将有助于并行运行
我们可以在多线程定义的线程池中运行spark,在循环中我们可以在并行模式下运行spark程序
//import statements
import java.util.concurrent.Executors
import org.apache.spark.sql.SparkSession
import scala.io.Source.fromFile
//Defined class
object ParallerlExecution {
//main method start
def main(args: Array[String]): Unit = {
//lad file
val queryList=loadFile()
start multi threading
parallerlExecution(queryList)
}
//method to load file
def loadFile():List[String]={
fromFile("").getLines().toList
}
//defined method to multithreading
def parallerlExecution(queryList:List[String]): Unit ={
//created spark session
val spark=SparkSession.builder().appName("test").master("local[*]").getOrCreate()
val url=""
val username=""
val password=""
//defined thread pool
val pool= Executors.newFixedThreadPool(3)
//start for loop
for(query<-queryList){
val r= new Runnable {
override def run(): Unit = {
val st = System.currentTimeMillis();
//load sql db
val df = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", query)
.option("user", username)
.option("password", password)
.load()
val count = df.count
val et = System.currentTimeMillis();
//printing final count
println("Thread" + Thread.currentThread().getId() + " Record Count " + count + " StartTime " + st + " Endtime " + et + " Query: " + query)
}
}
//start execute
pool.execute(r)
enter code here
}
}
暂无答案!
目前还没有任何答案,快来回答吧!