如何并行和分布式执行多个查询?

ldioqlga  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(416)

我使用的是spark 2.4.1版本和Java8。
我有这样的场景:
将提供属性文件中要处理的分类器列表。
这些分类器决定了要提取和处理的数据。
如下所示:

val classifiers = Seq("classifierOne","classifierTwo","classifierThree");

for( classifier : classifiers ){
  // read from CassandraDB table   
  val acutalData = spark.read(.....).where(<classifier conditition>)

  // the data varies depend on the classifier passed in 
  // this data has many fields along with fieldOne, fieldTwo and fieldThree

取决于分类器,我需要过滤数据。目前我正在做以下工作:

if(classifier.===("classifierOne")) {
  val classifierOneDs =  acutalData.filter(col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()));
  writeToParquet(classifierOneDs);                           
} else if(classifier.===("classifierTwo")) {
  val classifierTwoDs =  acutalData.filter(col("classifierTwo").notEqual(lit("")).or(col("classifierTwo").isNotNull()));
  writeToParquet(classifierOneDs);
} else (classifier.===("classifierThree")) {
  val classifierThreeDs =  acutalData.filter(col("classifierThree").notEqual(lit("")).or(col("classifierThree").isNotNull()));
  writeToParquet(classifierOneDs);
}

有没有办法避免 if - else 在这堵?有没有其他方法可以做到这一点?

jucafojl

jucafojl1#

您的问题似乎更多地是关于如何构造应用程序,而不是spark本身。实际上有两部分。
有没有办法避开这里的if-else障碍?
“避免”?在什么意义上?spark无法神奇地“发现”您的分布式处理方式。你应该帮点忙。
对于这种情况,我建议使用一个包含所有可能的过滤条件及其名称的查找表,例如。

val classifiers = Map(
  "classifierOne" -> col("classifierOne").notEqual(lit("")).or(col("classifierOne").isNotNull()),
  "classifierTwo" -> ...,
  "classifierThree" -> ...)

为了使用它,你只需遍历所有的分类器(或者根据需要查找尽可能多的分类器)。

val queries = classifiers.map { case (name, cond) =>
  spark
    .read(.....)
    .where(cond)
    .filter(col(name).notEqual(lit("")).or(col(name).isNotNull()))
}
``` `queries` 是要创建的Dataframe的集合 `writeToParquet` 如何使查询并行执行取决于您(spark将负责以分布式方式执行)。使用scala futures或其他并行api。
我认为以下几点可以让它变得很好:

queries.par.foreach(writeToParquet)

与 `queries.par.foreach` 你基本上执行所有 `writeToParquet` 同时进行。自 `writeToParquet` 执行一个Dataframe操作,以符合任何其他操作的所有spark规则的Parquet格式写入。它将运行一个spark作业(甚至可能不止一个),并使用spark机器以分布式方式执行该作业。
想想 `queries.par` 作为一种逐个执行查询的方法,无需等待前面的查询完成后再开始新的查询。强烈建议您配置公平调度模式:
在给定的spark应用程序(sparkcontext示例)中,如果多个并行作业是从不同的线程提交的,那么它们可以同时运行。
在公平共享下,spark以“循环”的方式在作业之间分配任务,这样所有作业都可以获得大致相等的集群资源份额。
oug3syen

oug3syen2#

所以,您需要根据分类器名称选择要检查的列,该列将作为列表传递?

val classifiers = Seq("classifierOne","classifierTwo","classifierThree");

for(classifier : classifiers) {

    val acutalData = spark.read(.....).where(<classifier conditition>)
    val classifierDs =  acutalData.filter(col(classifier).notEqual(lit("")).or(col(classifier).isNotNull()));
    writeToParquet(classifierDs);

}

在遍历列表时,无论如何都要遍历所有分类器。如果列名可以和实际的分类器名不同,则可以将其设置为 List[Classifier] ,在哪里 Classifier 有点像 case class Classifier(colName: String, classifierName: String)

相关问题