apachespark

oknrviil  于 2021-05-27  发布在  Spark
关注(0)|答案(1)|浏览(599)

我在处理scala未来列表中的异常时遇到问题。我在打电话 getQC_report(qcArgsThread,spark) 方法内 ruuner 处理输入文件并保存在配置单元表中的方法。下面的代码

import scala.util.{Failure, Success}
  import scala.concurrent._
  import scala.concurrent.duration._
  val spark = SparkSession.builder.master("yarn").enableHiveSupport().getOrCreate()

  var argsList: List[Array[String]] = List[Array[String]]()
  for(ip_file <- INPUT_FILE.asScala.toList) {
    var qcArgs:Array[String] = null
       qcArgs = Array("input_file", ip_file,
        "hiveDB",hiveDB,
        "Outputhive_table",Outputhive_table)
    argsList = qcArgs :: argsList
  }
  var pool = 0

  def poolId = {
    pool = pool + 1
    pool
  }

  def runner(qcArgsThread: Array[String]) = Future {
    sc.setLocalProperty("spark.scheduler.pool", poolId.toString)
    getQC_report(qcArgsThread,spark)
    }
  val futures = argsList map(i => runner(i))

  futures foreach(f => Await.ready(f, Duration.Inf))

  futures.onComplete {
    case Success(x) => {
      println(s"\nresult = $x")
    }
    case Failure(e) => {
      System.err.println("Failure happened!")
      System.err.println(e.getMessage)
    }
  }

我犯了个错误 futures.onComplete 线路。
错误-无法在完成时解析符号。
请帮助我改进代码,因为我是新使用scala期货。谢谢!

4ioopgfo

4ioopgfo1#

简单的回答是因为 argsList 是一个 List[Array[String]] ```
val futures = argsList map(i => runner(i))

将有类型 `List[Future[WhateverGetQC_ReportReturns]]` . 具体来说,它不是一个 `Future` ,所以没有 `onComplete` 方法。
如果你想有一个 `Future` 当所有的未来都完成的时候, `Future.sequence` 将转换为 `List[Future[T]]` 变成一个 `Future[List[T]]` :

// replaces all code after val futures = argsList map ...
val allFutures = Future.sequence(futures)

val result: List[WhateverGetQC_ReportReturns] =
try {
Await.result(allFutures, Duration.Inf)
} catch {
case NonFatal(e) =>
System.err.println("Failure happened!")
System.err.println(e.getMessage)
}

相关问题