我试着写一个简单的测试,但不知道为什么它在我的代码中不起作用。不知道我错过了什么。它似乎从来没有调用触发器到测试中的输出。管道只是发出字符串的聚合输出每秒的每一个窗口。我错过了什么?
object CollectSink {
val collectedResults = mutable.ListBuffer[(Int, String)]()
}
class CollectSink extends SinkFunction[(Int, String)] {
override def invoke(value: (Int, String), context: SinkFunction.Context): Unit = {
CollectSink.collectedResults += value
}
}
class MyTestSuite extends BaseSuite with BeforeAndAfterAll with BeforeAndAfterEach {
val flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build
)
override def beforeAll() = {
flinkCluster.before()
}
override def beforeEach() = {
CollectSink.collectedResults.clear()
}
override def afterAll() = {
flinkCluster.after()
}
it should "Simple pipeline emits the expected output" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
val sourceDataStream = env.fromCollection(
Seq(
(1, "a"),
(1, "b"),
(2, "c"),
(1, "d"),
(2, "e")
)
)
val collectSink = new CollectSink()
// Sample pipeline that emits aggregated strings
// It emits the result every second for every window of 1 second
sourceDataStream
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](Time.seconds(1)))
.reduce(new ReduceFunction[(Int, String)] {
override def reduce(value1: (Int, String), value2: (Int, String)): (Int, String) = (value1._1, value1._2 + value2._2)
})
.addSink(collectSink)
env.execute()
// Issue here is that it never collects anything somehow, size is always 0
// It seems like the the pipeline never emits anything in this test
CollectSink.collectedResults should have size 1
}
}
参考测试文档:https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/testing/#testing-flink-jobs
1条答案
按热度按时间bvjxkvbb1#
为了保证这个测试产生结果,它必须运行至少一秒钟。由于只有5个事件要处理,这是不可能的。
一旦所有的输入都被处理,具有绑定源的作业就会停止。挂起处理时间计时器不会被调用。
几个可能的解决方法: