scala—并行保存两个RDD

whlutmcx  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(343)

accesslogs.saveastextfile(outputdirectory1)
accesslist.saveastextfile(输出目录2)
如何并行而不是串联地保存rdd?

aiazj4mn

aiazj4mn1#

您可以将它们保存在线程中。

new Thread() {
      override def run(): Unit = {
accessLogs.saveAsTextFile(outputDirectory1) 
      }
    }.start()

  new Thread() {
      override def run(): Unit = {
accessList.saveAsTextFile(outputDirectory2)
      }
    }.start()
``` `saveAsTextFile` 不返回任何内容,所以我不知道为什么要设置返回值。
vi4fp9gy

vi4fp9gy2#

import scala.concurrent._
import scala.concurrent.duration._

val rdds = Seq(accessLogs, accessLists)
val dirs = Seq("outputDirectory1", "outputDirectory2")

import ExecutionContext.Implicits.global
val future = Future.sequence(
  for ((rdd, dir) <- rdds zip dirs) yield Future(rdd.saveAsTextFile(dir))
)
//Await.ready(future, Duration.Inf) //to wait for rdds to be saved...

请注意,尽管名称不同,方法 sequenceFuture 上面使用的伴随对象将执行 Futures 由平行的而不是顺序的理解而产生的。这个 sequence 方法本质上是一个应用函子 sequence .

相关问题