flink 1.8.2中没有删除testharness计时器

y0u0uwnf  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(323)

我正在使用testharness测试我的自定义触发器。简化的代码段附在下面:

public class CustomTrigger extends Trigger<InputPOJO, TimeWindow> {

    private final ReducingStateDescriptor<Long> previousTriggerDesc = new ReducingStateDescriptor<>( "previous-trigger", new Max(),LongSerializer.INSTANCE);

    private final long allowedLatenessMillis;

    public CustomTrigger(long allowedLatenessMillis) {
        this.allowedLatenessMillis = allowedLatenessMillis;
    }

    @Override
    public TriggerResult onElement(InputPOJO inputPOJO, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {

        ReducingState<Long> previousTriggerState = ctx.getPartitionedState(previousTriggerDesc);
        Long previousTriggerTime = ctx.getPartitionedState(previousTriggerDesc).get();

        // Remove previous Timer trigger. else it will invoke twice.
        if (previousTriggerTime != null) {
            ctx.deleteProcessingTimeTimer(previousTriggerTime); //NOTE
            System.out.println("deleteProcessingTimeTimer(previousTriggerTime)"+previousTriggerTime); // Invoked
        }

        // register new trigger for current InputPOJO.      
        long currentTriggerTime = ctx.getCurrentProcessingTime() + allowedLatenessMillis;
        ctx.registerProcessingTimeTimer(currentTriggerTime);

        // Update currentTriggerTime in previousTriggerState.
        previousTriggerTimeState.add(currentTriggerTime);

        return TriggerResult.CONTINUE;
    }

    ...
}

在自定义触发器中,我为每个新的inputpojo注册一个新的计时器。当我注册计时器时,我正在删除前一个计时器(基于先前的TimerTriggerTime,保存在简化状态)。
我正在使用下面的代码片段测试计时器计数(以及窗口)。

private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;

    TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?

    ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

    /**
     * Creating windowOperator for the below function
     *
     * <pre>
     *
     *      DataStream<OutputPOJO> OutputPOJOStream =
     *         inputPOJOStream
     *             .keyBy(InputPOJO::getKey)
     *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
     *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
     *             .process(new CustomWindowFunction(windowListStateTtlMillis));
     * </pre>
     */
    customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));

    WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
        operator =
            new WindowOperator<>(
                // setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
                ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
                new TimeWindow.Serializer(),
                // setting .keyBy(InputPOJO::getKey)
                keySelector,
                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                stateDesc,
                // setting  .process(new CustomWindowFunction(windowListStateTtlMillis))
                new InternalIterableProcessWindowFunction<>(CustomWindowFunction),
                // setting .trigger(new CustomTrigger(allowedLateness))
                new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
                0,
                null);

    // Creating testHarness for window operator
    testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);

    // Setup and Open  Test Harness
    testHarness.setup();

    testHarness.open();
}

@Test
public void test_allowedLateness_extension_on_second_pojo() throws Exception {

    int allowedLatenessSeconds = 3;
    int listStateTTL = 10;

    //1. Arrange
    InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
    InputPOJO listStateInput2 = new InputPOJO(2,"Arun");

    // 2. Act
    // listStateInput1 comes at 1 sec
    testHarness.setProcessingTime(secondsToMillis(1));
    testHarness.processElement(new StreamRecord<>(listStateInput1));

    // listStateInput2 comes at 2 sec, ie in the allowedLateness period of listStateInput1
    testHarness.setProcessingTime(secondsToMillis(2));
    testHarness.processElement(new StreamRecord<>(listStateInput1));

    // Expectation : listStateInput2 deletes the existing untriggered timer of listStateInput1 and registers a new timer. 
    // Actual: listStateInput2 registered a new timer and the total count is 3.
    // NOTE: 
    // 1. Here I am using SessionWindow, so by default 1 timer would be registered for SessionGap.
    // 2. Second timer should be the InputPOJO registered timer.
     Assert.assertEquals(2, testHarness.numProcessingTimeTimers()); // FAILS

}

在这里,功能, ctx.deleteProcessingTimeTimer(previousTriggerTime); ,正在触发。但是testharness中的timercount仍然显示为3。
是testharness中的bug吗?
请提供一种使用testharness测试计时器计数的方法。
附言:
尽管这看起来像sessionwindow.gap()的典型功能,但我在一个复杂的计算中使用了这个自定义触发器。为简单起见,我将逻辑简化为上述内容。
我正在使用 ListStateDescriptor 创建时 WindowOperator 用于测试线束。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题