testharness liststate ttl未应用于flink 1.8.2

lvmkulzt  于 2021-06-24  发布在  Flink
关注(0)|答案(0)|浏览(201)

我正在测试一个窗口函数,它有一个liststate,并且启用了ttl。
窗口函数片段:

public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {

  ...

  @Override
  public void open(Configuration config) {
    StateTtlConfig ttlConfig =
        StateTtlConfig.newBuilder(listStateTTl)
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // NOTE: NeverReturnExpired
            .build();
    listStateDescriptor =  new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
    listStateDescriptor.enableTimeToLive(ttlConfig);
  }

  @Override
  public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {

        ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);

        ....

        Iterator<InputPOJO> iterator;

        // Getting unexpired listStateItems for computation.
        iterator = listState.get().iterator();
        while (iterator.hasNext()) {
            InputPOJO listStateInput = iterator.next();
            System.out.println("There are unexpired elements in listState");

            /**Business Logic to compute result using the unexpired values in listState**/
        }

        /**Business Logic to compute result using the current window elements.*/

        // Adding unProcessed WindowElements to ListState(with TTL)
        // NOTE: processed WindowElements are removed manually.
        iterator = windowElements.iterator();
        while (iterator.hasNext()) {
            System.out.println("unProcessed Item added to ListState.")
            InputPOJO unprocessedItem = iterator.next();
            listState.add(unprocessedItem); // This part gets executed for listStateInput1
        }

    }
    ....
}

我正在使用 testHarness 执行集成测试。当liststate的ttl过期时,我正在测试liststate项计数。下面是我的测试函数片段。
注:
有一个自定义allowedlateness,它是使用自定义计时器实现的。

private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;

private CustomWindowFunction customWindowFunction;

@Before
public void setup_testHarness() throws Exception {

    KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;

    TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?

    ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?

    /**
     * Creating windowOperator for the below function
     *
     * <pre>
     *
     *      DataStream<OutputPOJO> OutputPOJOStream =
     *         inputPOJOStream
     *             .keyBy(InputPOJO::getKey)
     *             .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
     *             .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
     *             .process(new CustomWindowFunction(windowListStateTtlMillis));
     * </pre>
     */
    customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));

    WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
        operator =
            new WindowOperator<>(
                // setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
                ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
                new TimeWindow.Serializer(),
                // setting .keyBy(InputPOJO::getKey)
                keySelector,
                BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
                stateDesc,
                // setting  .process(new CustomWindowFunction(windowListStateTtlMillis))
                new InternalIterableProcessWindowFunction<>(customWindowFunction),
                // setting .trigger(new CustomTrigger(allowedLateness))
                new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
                0,
                null);

    // Creating testHarness for window operator
    testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);

    // Setup and Open  Test Harness
    testHarness.setup();

    testHarness.open();
}

@Test
public void test_listStateTtl_exclusion() throws Exception {

    int allowedLatenessSeconds = 3;
    int listStateTTL = 10;

    //1. Arrange
    InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
    InputPOJO listStateInput2 = new InputPOJO(2,"Arun");

    // 2. Act
    // listStateInput1 comes at 1 sec
    testHarness.setProcessingTime(secondsToMillis(1));
    testHarness.processElement(new StreamRecord<>(listStateInput1));

    // Setting current processing time to  1 + 3 = 4 > allowedLateness.
    // Window.process() is called, and window is purged (FIRE_AND_PURGE)
    // Expectation: listStateInput1 is put into listState with TTL (10 secs), before process() ends.
    testHarness.setProcessingTime(secondsToMillis(4));

    // Setting processing time after listStateTTL, ie 4 + listStateTTL(10) + 1 = 15
    // Expectation: listStateInput1 is evicted from the listState  (Fails)
    testHarness.setProcessingTime(secondsToMillis(15));

    // Using sleep(), the listStateTTL is getting applied to listState and listStateInput1 is evicted (Pass)
    //Thread.sleep(secondsToMillis(15))

    //Passing listStateInput2 to the test Harness
    testHarness.setProcessingTime(secondsToMillis(16));
    testHarness.processElement(new StreamRecord<>(listStateInput2));

    // Setting processing time after allowedLateness = 16 + 3 + 1 = 20
    testHarness.setProcessingTime(secondsToMillis(20));

    // 3. Assert
     List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
     // Expectation: streamRecords will only contain listStateInput2, since listStateInput1 was evicted.
     // Actual: Getting both listStateInput1 & listStateInput2 in the output.
}

我注意到ttl不是通过设置处理时间来应用的。当我用thread.sleep(ttl)尝试相同的函数时,结果与预期一样。
liststate ttl是否使用系统时间进行逐出(使用testharness)?
有没有办法用testharness测试liststatettl?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题