如何通过spark和scala并行查询从oncomplete成功案例返回值来构建Dataframe?

bqucvtff  于 2021-05-18  发布在  Spark
关注(0)|答案(1)|浏览(414)

如何从oncomplete success case返回值以便打印hello值?
目前,我成功地获得了具有正确值的查询响应,我可以在oncomplete success案例中处理它,以便打印正确的resp值。

import ing.wbaa.druid.SQLQuery

import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}

implicit val executionContext: ExecutionContext = ExecutionContext.Implicits.global
val query = SQLQuery(s"SELECT * FROM table1 WHERE col1 IN ('someString', 'someString2')")

val hello = executeQuery(query)
println(hello)

def executeQuery(foo: SQLQuery) = {
    foo.execute.onComplete {
      case Success(resp) => {
        println(resp)
        resp
      }
      case Failure(ex) => ex.printStackTrace()
    }
}

原因是我可能有一个字符串列表而不是单个字符串,所以我想:

queryList = List("someString1", ..., "someStringN")
val splitted: List[List[String]] = queryList.grouped(1000).toList
val temp = splittedQueryList.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(10))
val hello = temp.map(x => executeQuery(query, x)).seq

但是hello得到的不是值而是()。
你可能还会问我为什么要从一个完整的地方回来?因为我想用收到的响应构建sparkDataframe。

qcbq4gxm

qcbq4gxm1#

这是因为 onCompleteUnit 返回类型,在其签名处获取战利品:

def onComplete[U](@deprecatedName('func) f: Try[T] => U)(implicit executor: ExecutionContext): Unit

目的 onComplete 是为添加回调 success 以及 failure 分支机构 Future 评价。这个 onComplete 目的不是为了让未来的成功价值成为自己的东西,更多的是在结果类型(不是价值)的情况下添加一些处理: Success 或者 Failure . map 函数更适合你的使用目的,一些成功的结果来自 Future . 它会将结果值更改为您需要的值,而不会丢失异步性。在斯卡拉, Future 通常用于异步处理,最终结果通常是使用

Await.result(yourFuture, timeOutLimit) //it will return Future value.

所以,如果您想处理对数据库的一系列查询,请按顺序处理它们并获得一系列结果。这种技术被称为未来合成。
我想建议你更好的方法:
使用 map 中的函数 Future 例如,将查询到db的某些结果Map到您期望的类型。改变 executeQuery 返回的签名 Future[ResultType] 包裹 temp.map(x => executeQuery(query, x))Future.sequence 函数按顺序进行查询和转换 Seq[Future[ResultType]]Future[Seq[ResultType]] .
过程 Future[Seq[ResultType]] 使用 map 函数也可以得到最终结果 YourResultTypeProcessingType .
您的代码应如下所示:

def executeQuery(foo: SQLQuery): Future[ResultType] = {
  val future = foo.execute.map {
    resp =>
      println(resp)
      resp
  }
  // if you want to print failure exceptions, but I would prefer to use `recoverWith` function to handle failures.
  future.onFailure{
    case ex => ex.printStackTrace()
  }
  future
}

val helloFuture = executeQuery(query)
println(helloFuture)

queryList = List("someString1", ..., "someStringN")
val splitted: List[List[String]] = queryList.grouped(1000).toList
val temp = splittedQueryList.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(10))
val hellosFuture = Future.sequence(temp.map(x => executeQuery(query, x)))

def doSmth(value: Seq[ResultType]): YourResultTypeProcessingType = ???

hellosFuture.map{
  successHellosList =>
    doSmth(successHellosList)
}

了解scala未来的有用链接:
scala未来概述
期货组合,自由代码阵营

相关问题