junit 如何在Spark 2.0+中编写单元测试?

vwkv1x7d  于 2022-11-11  发布在  Spark
关注(0)|答案(6)|浏览(165)

我一直在尝试找到一种合理的方法来使用JUnit测试框架测试SparkSession,虽然似乎有一些很好的SparkContext示例,但我不知道如何获得一个适用于SparkSession的相应示例,即使它在spark-testing-base内部的几个地方被使用。我很乐意尝试一个不使用Spark测试的解决方案-如果这真的不是正确的方法。
简单测试用例(complete MWE projectbuild.sbt):

import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite

import org.apache.spark.sql.SparkSession

class SessionTest extends FunSuite with DataFrameSuiteBase {

  implicit val sparkImpl: SparkSession = spark

  @Test
  def simpleLookupTest {

    val homeDir = System.getProperty("user.home")
    val training = spark.read.format("libsvm")
      .load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
    println("completed simple lookup test")
  }

}

使用JUnit运行此命令的结果是在加载行上出现一个NPE:

java.lang.NullPointerException
    at SessionTest.simpleLookupTest(SessionTest.scala:16)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

请注意,加载的文件是否存在并不重要;在正确配置的SparkSession中,为more sensible error will be thrown

gwbalxhn

gwbalxhn1#

感谢您提出这个突出的问题。由于某种原因,当谈到Spark时,每个人都太过沉迷于分析,以至于忘记了过去15年左右出现的伟大的软件工程实践。这就是为什么我们在课程中重点讨论测试和持续集成(以及DevOps等其他内容)。

术语简介

一个 true 的单元测试意味着您可以完全控制测试中的每个组件,不需要与数据库、REST调用、文件系统甚至系统时钟进行交互;正如Gerard Mezaros在xUnit Test Patterns中所说的那样,所有的东西都必须被“加倍”(例如,mocked,stubbed等)。我知道这看起来像是语义,但它确实很重要。未能理解这一点是您在持续集成中看到间歇性测试失败的一个主要原因。

我们仍然可以进行单元测试

因此,基于这样的理解,对RDD进行单元测试是不可能的。然而,在开发分析时,单元测试仍然有一席之地。
考虑一个简单的操作:

rdd.map(foo).map(bar)

这里的foobar是简单的函数。它们可以用普通的方法进行单元测试,并且应该使用尽可能多的极端情况。毕竟,为什么它们关心从哪里获得输入,是从测试设备还是从RDD
"别忘了Spark弹“
这不是测试 * 本身 *,但是在这些早期阶段,您还应该在Spark shell中进行试验,以弄清楚您的转换,尤其是您的方法的结果。例如,您可以检查物理和逻辑查询计划、分区策略和保存,以及使用许多不同的函数(如toDebugStringexplainglom)的数据状态。showprintSchema等等。我将让您探索这些。
您还可以在Spark shell和测试中将master设置为local[2],以识别只有在开始分发工作时才可能出现的任何问题。

与Spark的集成测试

现在来点有趣的吧。
为了在您对您的助手函数和RDD/DataFrame转换逻辑的质量有信心之后 * 集成测试 * Spark,做几件事是至关重要的(不管构建工具和测试框架如何):

  • 增加JVM内存。
  • 启用分支但禁用并行执行。
  • 使用您的测试框架将您的Spark集成测试累积到suites中,并在所有测试之前初始化SparkContext,并在所有测试之后停止它。

使用ScalaTest,您可以混合使用BeforeAndAfterAll(我通常更喜欢使用它)或BeforeAndAfterEach,就像@ShankarKoirala所做的那样,以初始化和拆除Spark工件。我知道这是一个合理的例外,但我真的不喜欢您必须使用的那些可变的var

贷款模式

另一种方法是使用Loan Pattern
例如(使用ScalaTest):

class MySpec extends WordSpec with Matchers with SparkContextSetup {
  "My analytics" should {
    "calculate the right thing" in withSparkContext { (sparkContext) =>
      val data = Seq(...)
      val rdd = sparkContext.parallelize(data)
      val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

      total shouldBe 1000
    }
  }
}

trait SparkContextSetup {
  def withSparkContext(testMethod: (SparkContext) => Any) {
    val conf = new SparkConf()
      .setMaster("local")
      .setAppName("Spark test")
    val sparkContext = new SparkContext(conf)
    try {
      testMethod(sparkContext)
    }
    finally sparkContext.stop()
  }
}

正如您所看到的,Loan模式使用高阶函数将SparkContext“借出”给测试,然后在测试完成后将其丢弃。

以痛苦为导向的编程(谢谢,Nathan)

这完全是一个偏好问题,但我更喜欢使用LoanPattern,在引入另一个框架之前尽可能自己连接这些东西。框架有时会添加很多“魔法”,使得调试测试失败很难解释。在这种情况下,我避免添加新的框架,直到没有它的痛苦无法承受为止。
对于这个替代框架,最好的选择当然是spark-testing-base,正如@ShankarKoirala所提到的那样。

class MySpec extends WordSpec with Matchers with SharedSparkContext {
      "My analytics" should {
        "calculate the right thing" in { 
          val data = Seq(...)
          val rdd = sc.parallelize(data)
          val total = rdd.map(...).filter(...).map(...).reduce(_ + _)

          total shouldBe 1000
        }
      }
 }

请注意,我不需要做任何事情来处理SparkContextSharedSparkContext免费提供了所有这些--sc作为SparkContext。就我个人而言,我不会仅仅为了这个目的而引入这个依赖项,因为LoanPattern正好满足了我的需要。此外,由于分布式系统中发生了如此多的不可预测性,当在持续集成中出现问题时,必须跟踪第三方库的源代码中发生的奇迹可能是一件真实的的痛苦。
现在,spark-testing-base 真正的亮点是基于Hadoop的助手,如HDFSClusterLikeYARNClusterLike。将这些特性混合在一起,可以真正为您节省大量的设置痛苦。它的另一个亮点是类似Scalacheck的属性和生成器--当然,前提是您了解基于属性的测试是如何工作的,以及它为什么有用。但是,我个人会推迟使用它,直到我的分析和测试达到那个复杂的水平。
“只有西斯做的是绝对的事”--欧比万·克诺比
当然,你也不必选择其中之一,也许你可以在大多数测试中使用Loan Pattern方法,而 spark-testing-base 只在少数几个更严格的测试中使用。您可以两者兼顾。

Spark Streaming集成测试

最后,我只想展示一个片段,展示带有内存值的SparkStreaming集成测试设置在没有 spark-testing-base 的情况下可能会是什么样子:

val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd

这比看起来要简单。它实际上只是将一个数据序列转换成一个队列,然后输入到DStream。它的大部分内容实际上只是与Spark API一起工作的样板设置。无论如何,您可以将其与StreamingSuiteBaseas found inspark-testing-base 进行比较,以决定您更喜欢哪一个。
这可能是我写的最长的一篇文章,所以我将把它留在这里。我希望其他人也能加入进来,用同样的敏捷软件工程实践来帮助我们提高分析的质量,这些实践已经改进了所有其他的应用程序开发。
对于这个无耻的插件,我们深表歉意,你可以查看我们的课程Software Engineering with Apache Spark,在那里我们讨论了很多这样的想法和更多。我们希望很快有一个在线版本。

ffx8fchx

ffx8fchx2#

您可以使用FunSuite和BeforeAndAfterEach编写一个简单的测试,如下所示

class Tests extends FunSuite with BeforeAndAfterEach {

  var sparkSession : SparkSession = _
  override def beforeEach() {
    sparkSession = SparkSession.builder().appName("udf testings")
      .master("local")
      .config("", "")
      .getOrCreate()
  }

  test("your test name here"){
    //your unit test assert here like below
    assert("True".toLowerCase == "true")
  }

  override def afterEach() {
    sparkSession.stop()
  }
}

你不需要在测试中创建一个函数,你可以简单地写为

test ("test name") {//implementation and assert}

霍尔顿Karau写了一个非常好的测试spark-testing-base
下面是一个简单的需要结帐的例子

class TestSharedSparkContext extends FunSuite with SharedSparkContext {

  val expectedResult = List(("a", 3),("b", 2),("c", 4))

  test("Word counts should be equal to expected") {
    verifyWordCount(Seq("c a a b a c b c c"))
  }

  def verifyWordCount(seq: Seq[String]): Unit = {
    assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
  }
}

希望这对你有帮助!

qv7cva1a

qv7cva1a3#

由于Spark 1.6,您可以使用Spark用于其自身单元测试的SharedSparkContextSharedSQLContext

class YourAppTest extends SharedSQLContext {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    val df = sqlContext.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

由于Spark 2.3SharedSparkSession可用:

class YourAppTest extends SharedSparkSession {

  var app: YourApp = _

  protected override def beforeAll(): Unit = {
    super.beforeAll()

    app = new YourApp
  }

  protected override def afterAll(): Unit = {
    super.afterAll()
  }

  test("Your test") {
    df = spark.read.json("examples/src/main/resources/people.json")

    app.run(df)
  }

更新日期:

Maven依赖项:

<dependency>
  <groupId>org.scalactic</groupId>
  <artifactId>scalactic</artifactId>
  <version>SCALATEST_VERSION</version>
</dependency>
<dependency>
  <groupId>org.scalatest</groupId>
  <artifactId>scalatest</artifactId>
  <version>SCALATEST_VERSION</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-core</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-sql</artifactId>
  <version>SPARK_VERSION</version>
  <type>test-jar</type>
  <scope>test</scope>
</dependency>

SBT依赖性:

"org.scalactic" %% "scalactic" % SCALATEST_VERSION
"org.scalatest" %% "scalatest" % SCALATEST_VERSION % "test"
"org.apache.spark" %% "spark-core" % SPARK_VERSION % Test classifier "tests"
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"

此外,您还可以查看Spark的test sources,其中有大量的各种测试套件。

更新2:

Apache Spark Unit Testing Part 1 — Core Components
Apache Spark Unit Testing Part 2 — Spark SQL
Apache Spark Unit Testing Part 3 — Streaming
Apache Spark Integration Testing
Test Driven Development of Apache Spark applications

ncgqoxb0

ncgqoxb04#

我喜欢创建一个可以混合到测试类中的SparkSessionTestWrapper特征。Shankar的方法是可行的,但是对于包含多个文件的测试套件来说,速度太慢了。

import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

  lazy val spark: SparkSession = {
    SparkSession.builder().master("local").appName("spark session").getOrCreate()
  }

}

该特征可按如下方式使用:

class DatasetSpec extends FunSpec with SparkSessionTestWrapper {

  import spark.implicits._

  describe("#count") {

    it("returns a count of all the rows in a DataFrame") {

      val sourceDF = Seq(
        ("jets"),
        ("barcelona")
      ).toDF("team")

      assert(sourceDF.count === 2)

    }

  }

}

查看spark-spec项目,了解使用SparkSessionTestWrapper方法的真实示例。

更新

当某些特性混合到测试类中时,spark-testing-base library会自动添加SparkSession(例如,当DataFrameSuiteBase混合到测试类中时,您将可以通过spark变量访问SparkSession)。
我创建了一个单独的测试库spark-fast-tests,给予用户在运行他们的测试时完全控制SparkSession。我不认为测试助手库应该设置SparkSession。用户应该能够在他们认为合适的时候启动和停止他们的SparkSession(我喜欢创建一个SparkSession,并在整个测试套件运行中使用它)。
下面是spark-fast-tests assertSmallDatasetEquality方法的一个实际应用示例:

import com.github.mrpowers.spark.fast.tests.DatasetComparer

class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {

  import spark.implicits._

    it("aliases a DataFrame") {

      val sourceDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("name")

      val actualDF = sourceDF.select(col("name").alias("student"))

      val expectedDF = Seq(
        ("jose"),
        ("li"),
        ("luisa")
      ).toDF("student")

      assertSmallDatasetEquality(actualDF, expectedDF)

    }

  }

}
tez616oj

tez616oj5#

我可以用下面的代码解决这个问题
spark-hive依赖项已添加到项目pom中

class DataFrameTest extends FunSuite with DataFrameSuiteBase{
        test("test dataframe"){
        val sparkSession=spark
        import sparkSession.implicits._
        var df=sparkSession.read.format("csv").load("path/to/csv")
        //rest of the operations.
        }
        }
z8dt9xmd

z8dt9xmd6#

另一种使用JUnit进行单元测试的方法

import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.{After, Before, _}

@Test
class SessionSparkTest {
  var spark: SparkSession = _

  @Before
  def beforeFunction(): Unit = {
    //spark = SessionSpark.getSparkSession()
    spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
    System.out.println("Before Function")
  }

  @After
  def afterFunction(): Unit = {
    spark.stop()
    System.out.println("After Function")
  }

  @Test
  def testRddCount() = {
    val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
    val count = rdd.count()
    assertTrue(3 == count)
  }

  @Test
  def testDfNotEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
    assertFalse(numDf.head(1).isEmpty)
  }

  @Test
  def testDfEmpty() = {
    val sqlContext = spark.sqlContext
    import sqlContext.implicits._
    val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
    assertTrue(emptyDf.head(1).isEmpty)
  }
}

case class Num(id: Int)

相关问题