我在处理scala未来列表中的异常时遇到问题。我在打电话 getQC_report(qcArgsThread,spark)
方法内 ruuner
处理输入文件并保存在配置单元表中的方法。下面的代码
import scala.util.{Failure, Success}
import scala.concurrent._
import scala.concurrent.duration._
val spark = SparkSession.builder.master("yarn").enableHiveSupport().getOrCreate()
var argsList: List[Array[String]] = List[Array[String]]()
for(ip_file <- INPUT_FILE.asScala.toList) {
var qcArgs:Array[String] = null
qcArgs = Array("input_file", ip_file,
"hiveDB",hiveDB,
"Outputhive_table",Outputhive_table)
argsList = qcArgs :: argsList
}
var pool = 0
def poolId = {
pool = pool + 1
pool
}
def runner(qcArgsThread: Array[String]) = Future {
sc.setLocalProperty("spark.scheduler.pool", poolId.toString)
getQC_report(qcArgsThread,spark)
}
val futures = argsList map(i => runner(i))
futures foreach(f => Await.ready(f, Duration.Inf))
futures.onComplete {
case Success(x) => {
println(s"\nresult = $x")
}
case Failure(e) => {
System.err.println("Failure happened!")
System.err.println(e.getMessage)
}
}
我犯了个错误 futures.onComplete
线路。
错误-无法在完成时解析符号。
请帮助我改进代码,因为我是新使用scala期货。谢谢!
1条答案
按热度按时间4ioopgfo1#
简单的回答是因为
argsList
是一个List[Array[String]]
```val futures = argsList map(i => runner(i))
// replaces all code after val futures = argsList map ...
val allFutures = Future.sequence(futures)
val result: List[WhateverGetQC_ReportReturns] =
try {
Await.result(allFutures, Duration.Inf)
} catch {
case NonFatal(e) =>
System.err.println("Failure happened!")
System.err.println(e.getMessage)
}