最小化Spark会话/配置以实现最佳单元测试性能?

tyg4sfes  于 2023-05-23  发布在  Apache
关注(0)|答案(1)|浏览(142)

Spark 2.1.0和Scala 2.11。
我们的项目有数百个单元测试,它们执行相对简单的操作,例如创建3个或4个对象的数据集,并对它们执行简单的转换。这些测试中的许多测试需要长达5-10秒的时间来运行,数百个测试加起来需要很多分钟,这对我们的CI构建来说是一个问题。操作是如此简单,我想知道是否有一个Spark配置,我们可以使用,以加快速度。
例如,简单地创建这样的数据集:

val histData = Seq(
  FooType(id = "id1", code = "code1",orgId = 1l),
  FooType(id = "id2", code = "code2",orgId = 1l),
  FooType(id = "id3", code = "code3",orgId = 1l)
).toDS()

需要800毫秒(FooType是一个case类)。在创建了2到3个这样的数据集和一些过滤器/Map/连接操作之后(我真的不认为这些细节很重要,但如果你让我知道,我会发布它们),collect()需要1000-2000毫秒。把几个这样的操作加起来,一个测试可能需要5-10秒。
对于单元测试,我们只关心测试的功能方面,我们不需要线程,缩放,缓存,磁盘存储等。测试数据很小(通常小于1 KB),并且在内存中创建(不从磁盘或任何外部源读取),并且在内存中转换的对象上执行Assert。我知道Spark可能会在幕后调用DAGScheduler、代码生成器等。我想知道是否有一种方法可以在没有该功能的情况下执行作业。或者,如果必须这样做,在单元测试套件开始时做一次,并在整个过程中使用它。
创建会话时使用如下内容:

session = SparkSession.builder.config("spark.sql.shuffle.partitions","10").getOrCreate()

并且在每个单元测试中使用相同的会话。我们直接从单元测试中调用Spark API方法,因此没有Spark提交或单独的进程或作业,它都在调用单元测试的IDE或gradle创建的JVM中运行。
在我看来,这些操作应该每次花费几毫秒,我正在寻找一种方法来帕雷Spark配置,以便它以最快的方式评估内存中的所有内容。感谢任何提示或想法。

b4lqfgs4

b4lqfgs41#

从我个人使用Scala的经验来看。
1.创建测试 Package 对象

object SparkSessionTestWrapper {
  lazy val spark: SparkSession = {
    SparkSession.builder()
      .appName("Unit Test")
      .master("local[*]")
      .getOrCreate()
  }
}

优化性能的一个好方法是在创建spark会话时禁用UI

object SparkSessionTestWrapper {
  lazy val spark: SparkSession = {
    SparkSession.builder()
      .appName("Unit Test")
      .master("local[*]")
      .config("spark.ui.enabled", "false")  
      .getOrCreate()
  }
}

lazy瓦尔spark确保spark会话只创建一次并被使用。
1.然后在每个UT类中调用SparkSessionTestWrapper,它将使用相同的“共享”spark会话,因此不需要为每个UT创建一个:

class UnitTest1 {
  import SparkSessionTestWrapper._

  test("Test case 1") {
    import spark.implicits._

    val histData = Seq(
      FooType(id = "id1", code = "code1",orgId = 1l),
      FooType(id = "id2", code = "code2",orgId = 1l),
      FooType(id = "id3", code = "code1",orgId = 1l)
    ).toDS()

    val result = histData.filter($"orgId " === 1l).select("code").distinct().collect()

    assert(result.length == 2)
    //other assertions ...
  }
}

相关问题