我一直在尝试找到一种合理的方法来使用JUnit测试框架测试SparkSession
,虽然似乎有一些很好的SparkContext
示例,但我不知道如何获得一个适用于SparkSession
的相应示例,即使它在spark-testing-base内部的几个地方被使用。我很乐意尝试一个不使用Spark测试的解决方案-如果这真的不是正确的方法。
简单测试用例(complete MWE project与build.sbt
):
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite
import org.apache.spark.sql.SparkSession
class SessionTest extends FunSuite with DataFrameSuiteBase {
implicit val sparkImpl: SparkSession = spark
@Test
def simpleLookupTest {
val homeDir = System.getProperty("user.home")
val training = spark.read.format("libsvm")
.load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
println("completed simple lookup test")
}
}
使用JUnit运行此命令的结果是在加载行上出现一个NPE:
java.lang.NullPointerException
at SessionTest.simpleLookupTest(SessionTest.scala:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
请注意,加载的文件是否存在并不重要;在正确配置的SparkSession中,为more sensible error will be thrown。
6条答案
按热度按时间gwbalxhn1#
感谢您提出这个突出的问题。由于某种原因,当谈到Spark时,每个人都太过沉迷于分析,以至于忘记了过去15年左右出现的伟大的软件工程实践。这就是为什么我们在课程中重点讨论测试和持续集成(以及DevOps等其他内容)。
术语简介
一个 true 的单元测试意味着您可以完全控制测试中的每个组件,不需要与数据库、REST调用、文件系统甚至系统时钟进行交互;正如Gerard Mezaros在xUnit Test Patterns中所说的那样,所有的东西都必须被“加倍”(例如,mocked,stubbed等)。我知道这看起来像是语义,但它确实很重要。未能理解这一点是您在持续集成中看到间歇性测试失败的一个主要原因。
我们仍然可以进行单元测试
因此,基于这样的理解,对
RDD
进行单元测试是不可能的。然而,在开发分析时,单元测试仍然有一席之地。考虑一个简单的操作:
这里的
foo
和bar
是简单的函数。它们可以用普通的方法进行单元测试,并且应该使用尽可能多的极端情况。毕竟,为什么它们关心从哪里获得输入,是从测试设备还是从RDD
?"别忘了Spark弹“
这不是测试 * 本身 *,但是在这些早期阶段,您还应该在Spark shell中进行试验,以弄清楚您的转换,尤其是您的方法的结果。例如,您可以检查物理和逻辑查询计划、分区策略和保存,以及使用许多不同的函数(如
toDebugString
、explain
、glom
)的数据状态。show
、printSchema
等等。我将让您探索这些。您还可以在Spark shell和测试中将master设置为
local[2]
,以识别只有在开始分发工作时才可能出现的任何问题。与Spark的集成测试
现在来点有趣的吧。
为了在您对您的助手函数和
RDD
/DataFrame
转换逻辑的质量有信心之后 * 集成测试 * Spark,做几件事是至关重要的(不管构建工具和测试框架如何):SparkContext
,并在所有测试之后停止它。使用ScalaTest,您可以混合使用
BeforeAndAfterAll
(我通常更喜欢使用它)或BeforeAndAfterEach
,就像@ShankarKoirala所做的那样,以初始化和拆除Spark工件。我知道这是一个合理的例外,但我真的不喜欢您必须使用的那些可变的var
。贷款模式
另一种方法是使用Loan Pattern。
例如(使用ScalaTest):
正如您所看到的,Loan模式使用高阶函数将
SparkContext
“借出”给测试,然后在测试完成后将其丢弃。以痛苦为导向的编程(谢谢,Nathan)
这完全是一个偏好问题,但我更喜欢使用LoanPattern,在引入另一个框架之前尽可能自己连接这些东西。框架有时会添加很多“魔法”,使得调试测试失败很难解释。在这种情况下,我避免添加新的框架,直到没有它的痛苦无法承受为止。
对于这个替代框架,最好的选择当然是spark-testing-base,正如@ShankarKoirala所提到的那样。
请注意,我不需要做任何事情来处理
SparkContext
。SharedSparkContext
免费提供了所有这些--sc
作为SparkContext
。就我个人而言,我不会仅仅为了这个目的而引入这个依赖项,因为LoanPattern正好满足了我的需要。此外,由于分布式系统中发生了如此多的不可预测性,当在持续集成中出现问题时,必须跟踪第三方库的源代码中发生的奇迹可能是一件真实的的痛苦。现在,spark-testing-base 真正的亮点是基于Hadoop的助手,如
HDFSClusterLike
和YARNClusterLike
。将这些特性混合在一起,可以真正为您节省大量的设置痛苦。它的另一个亮点是类似Scalacheck的属性和生成器--当然,前提是您了解基于属性的测试是如何工作的,以及它为什么有用。但是,我个人会推迟使用它,直到我的分析和测试达到那个复杂的水平。“只有西斯做的是绝对的事”--欧比万·克诺比
当然,你也不必选择其中之一,也许你可以在大多数测试中使用Loan Pattern方法,而 spark-testing-base 只在少数几个更严格的测试中使用。您可以两者兼顾。
Spark Streaming集成测试
最后,我只想展示一个片段,展示带有内存值的SparkStreaming集成测试设置在没有 spark-testing-base 的情况下可能会是什么样子:
这比看起来要简单。它实际上只是将一个数据序列转换成一个队列,然后输入到
DStream
。它的大部分内容实际上只是与Spark API一起工作的样板设置。无论如何,您可以将其与StreamingSuiteBase
as found inspark-testing-base 进行比较,以决定您更喜欢哪一个。这可能是我写的最长的一篇文章,所以我将把它留在这里。我希望其他人也能加入进来,用同样的敏捷软件工程实践来帮助我们提高分析的质量,这些实践已经改进了所有其他的应用程序开发。
对于这个无耻的插件,我们深表歉意,你可以查看我们的课程Software Engineering with Apache Spark,在那里我们讨论了很多这样的想法和更多。我们希望很快有一个在线版本。
ffx8fchx2#
您可以使用FunSuite和BeforeAndAfterEach编写一个简单的测试,如下所示
你不需要在测试中创建一个函数,你可以简单地写为
霍尔顿Karau写了一个非常好的测试spark-testing-base
下面是一个简单的需要结帐的例子
希望这对你有帮助!
qv7cva1a3#
由于Spark 1.6,您可以使用Spark用于其自身单元测试的
SharedSparkContext
或SharedSQLContext
:由于Spark 2.3
SharedSparkSession
可用:更新日期:
Maven依赖项:
SBT依赖性:
此外,您还可以查看Spark的test sources,其中有大量的各种测试套件。
更新2:
Apache Spark Unit Testing Part 1 — Core Components
Apache Spark Unit Testing Part 2 — Spark SQL
Apache Spark Unit Testing Part 3 — Streaming
Apache Spark Integration Testing
Test Driven Development of Apache Spark applications
ncgqoxb04#
我喜欢创建一个可以混合到测试类中的
SparkSessionTestWrapper
特征。Shankar的方法是可行的,但是对于包含多个文件的测试套件来说,速度太慢了。该特征可按如下方式使用:
查看spark-spec项目,了解使用
SparkSessionTestWrapper
方法的真实示例。更新
当某些特性混合到测试类中时,spark-testing-base library会自动添加SparkSession(例如,当
DataFrameSuiteBase
混合到测试类中时,您将可以通过spark
变量访问SparkSession)。我创建了一个单独的测试库spark-fast-tests,给予用户在运行他们的测试时完全控制SparkSession。我不认为测试助手库应该设置SparkSession。用户应该能够在他们认为合适的时候启动和停止他们的SparkSession(我喜欢创建一个SparkSession,并在整个测试套件运行中使用它)。
下面是spark-fast-tests
assertSmallDatasetEquality
方法的一个实际应用示例:tez616oj5#
我可以用下面的代码解决这个问题
spark-hive依赖项已添加到项目pom中
z8dt9xmd6#
另一种使用JUnit进行单元测试的方法