如何对处理时间计时器进行单元测试?

elcex8rz  于 2021-06-26  发布在  Flink
关注(0)|答案(1)|浏览(684)

我在为一个 CoProcessFunction . 在要触发的测试中,是否有方法手动向前推动处理时间 onTimer 电话?

2vuwiymt

2vuwiymt1#

flink提供了测试工具来测试计时器和状态的功能。允许您“控制”时间并验证状态的属性。
下面的代码块是从官方文档中复制的,目的是让您了解如何使用线束。

public class StatefulFlatMapTest {
    private OneInputStreamOperatorTestHarness<Long, Long> testHarness;
    private StatefulFlatMap statefulFlatMapFunction;

    @Before
    public void setupTestHarness() throws Exception {

        //instantiate user-defined function
        statefulFlatMapFunction = new StatefulFlatMapFunction();

        // wrap user defined function into a the corresponding operator
        testHarness = new OneInputStreamOperatorTestHarness<>(new StreamFlatMap<>(statefulFlatMapFunction));

        // optionally configured the execution environment
        testHarness.getExecutionConfig().setAutoWatermarkInterval(50);

        // open the test harness (will also call open() on RichFunctions)
        testHarness.open();
    }

    @Test
    public void testingStatefulFlatMapFunction() throws Exception {

        //push (timestamped) elements into the operator (and hence user defined function)
        testHarness.processElement(2L, 100L);

        //trigger event time timers by advancing the event time of the operator with a watermark
        testHarness.processWatermark(100L);

        //trigger processing time timers by advancing the processing time of the operator directly
        testHarness.setProcessingTime(100L);

        //retrieve list of emitted records for assertions
        assertThat(testHarness.getOutput(), containsInExactlyThisOrder(3L));

        //retrieve list of records emitted to a specific side output for assertions (ProcessFunction only)
        //assertThat(testHarness.getSideOutput(new OutputTag<>("invalidRecords")), hasSize(0))
    }
}

请查看文档,了解有关要包含哪些依赖项、存在哪些线束以及如何使用这些线束的详细信息。

相关问题