我正在尝试对涉及数据流的spark流应用程序执行一些单元测试。
我发现它非常有用,下面一套:streamingsuitebase。它包含一个名为testoperation的方法,您可以将输入、要测试的操作和预期的输出传递给该方法。它将验证您的预期输出是否与实际输出匹配。
我面临的问题是,在平等性验证过程中,我得到的确实是完全相同的对象,但被 Package 到不同的集合中:
应为:列表(myobject)
实际:数组(myobject)
testoperation定义如下:
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
ordered: Boolean
) (implicit equality: Equality[V]): Unit = {
val numBatches = input.size
withOutputAndStreamingContext(setupStreams[U, V](input, operation)) {
(outputStream, ssc) =>
val output: Seq[Seq[V]] = runStreams[V](
outputStream, ssc, numBatches, expectedOutput.size)
verifyOutput[V](output, expectedOutput, ordered)
}
}
这不允许我使用预期的输入 List(Array(myObject))
所以我的第二个选择是修改方法 verifyOutput
. 我打算从代码中重写它,只是添加几行来生成列表(array(myobject))。像这样(更新):
override def verifyOutput[V](output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
ordered: Boolean)
(implicit evidence$1: ClassTag[V], equality: Equality[V]): Unit = {
super.verifyOutput(output, expectedOutput, ordered)
}
//These three lines is what I am planning to add
val sq = expectedOutput(0)
val ssq = sq(0)
val newOutput = Seq(Array(ssq))
logInfo("--------------------------------")
logInfo("output.size = " + output.size)
logInfo("output")
output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Match the output with the expected output
assert(output.size === expectedOutput.size, "Number of outputs do not match")
if (ordered) {
for (i <- output.indices)
equalsOrdered(output(i), expectedOutput(i))
} else {
for (i <- output.indices)
equalsUnordered(output(i), expectedOutput(i))
}
logInfo("Output verified successfully")
}
整个streamingsuitebase可以在这里找到
但我在eclipse中遇到以下错误:
方法verifyoutput不重写任何内容。注意:myclass类的超类包含以下名为verifyoutput的非最终成员:def verifyoutput[v](output:seq[seq[v]],expectedoutput:seq[seq[v]],ordered:boolean)(隐式证据$1:scala.reflect.classtag[v],隐式等式:org.scalactic.equality[v]):unit
这是我的测试用例的简化版本:
import org.scalatest.FunSuite
class myClass extends StreamingSuiteBase with FunSuite {
test("ExtCustProfileHbaseAPI") {
//Here I would be generating my input and expected output
val inputData = new myInitialObject()
val expOutput = new myFinalObject()
testOperation(inputData, processTest _, expOutput, ordered = false)
}
def processTest(input: DStream[myInitialObject]): DStream[(String,myFinalObject)] = {
//Operation undertes
val result = operation(input)
result
}
//Here I added the override def verifyOutput[V: ClassTag]...
}
我做错什么了?
1条答案
按热度按时间mxg2im7a1#
据消息人士称
StreamingSuiteBase
特质有自我类型org.scalatest.Suite
这意味着您必须同时扩展suite类型类(在您的示例中是funsuite),否则它将无法编译。您可以参考:https://github.com/holdenk/spark-testing-base/wiki/streamingsuitebase
有关scala自身类型的更多信息,请参阅:https://docs.scala-lang.org/tour/self-types.html
你不需要一个
V: ClassTag
我看到的基本ide生成的重写方法是: