我正在测试一个窗口函数,它有一个liststate,并且启用了ttl。
窗口函数片段:
public class CustomWindowFunction extends ProcessWindowFunction<InputPOJO, OutputPOJO, String, TimeWindow> {
...
@Override
public void open(Configuration config) {
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(listStateTTl)
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // NOTE: NeverReturnExpired
.build();
listStateDescriptor = new ListStateDescriptor<>("unprocessedItems", InputPOJO.class);
listStateDescriptor.enableTimeToLive(ttlConfig);
}
@Override
public void process( String key, Context context, Iterable<InputPOJO> windowElements, Collector<OutputPOJO> out) throws Exception {
ListState<InputPOJO> listState = getRuntimeContext().getListState(listStateDescriptor);
....
Iterator<InputPOJO> iterator;
// Getting unexpired listStateItems for computation.
iterator = listState.get().iterator();
while (iterator.hasNext()) {
InputPOJO listStateInput = iterator.next();
System.out.println("There are unexpired elements in listState");
/**Business Logic to compute result using the unexpired values in listState**/
}
/**Business Logic to compute result using the current window elements.*/
// Adding unProcessed WindowElements to ListState(with TTL)
// NOTE: processed WindowElements are removed manually.
iterator = windowElements.iterator();
while (iterator.hasNext()) {
System.out.println("unProcessed Item added to ListState.")
InputPOJO unprocessedItem = iterator.next();
listState.add(unprocessedItem); // This part gets executed for listStateInput1
}
}
....
}
我正在使用 testHarness
执行集成测试。当liststate的ttl过期时,我正在测试liststate项计数。下面是我的测试函数片段。
注:
有一个自定义allowedlateness,它是使用自定义计时器实现的。
private OneInputStreamOperatorTestHarness<InputPOJO, OutputPOJO> testHarness;
private CustomWindowFunction customWindowFunction;
@Before
public void setup_testHarness() throws Exception {
KeySelector<InputPOJO, String> keySelector = InputPOJO::getKey;
TypeInformation<InputPOJO> STRING_INT_TUPLE = TypeInformation.of(new TypeHint<InputPOJO>() {}); // Any suggestion ?
ListStateDescriptor<InputPOJO> stateDesc = new ListStateDescriptor<>("window-contents", STRING_INT_TUPLE.createSerializer(new ExecutionConfig())); // Any suggestion ?
/**
* Creating windowOperator for the below function
*
* <pre>
*
* DataStream<OutputPOJO> OutputPOJOStream =
* inputPOJOStream
* .keyBy(InputPOJO::getKey)
* .window(ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)))
* .trigger(new CustomTrigger(triggerAllowedLatenessMillis))
* .process(new CustomWindowFunction(windowListStateTtlMillis));
* </pre>
*/
customWindowFunction = new CustomWindowFunction(secondsToMillis(windowListStateTtlMillis));
WindowOperator<String, InputPOJO, Iterable<InputPOJO>, OutputPOJO, TimeWindow>
operator =
new WindowOperator<>(
// setting .window(ProcessingTimeSessionWindows.withGap(maxTimeout))
ProcessingTimeSessionWindows.withGap(Time.seconds(triggerMaximumTimeoutSeconds)),
new TimeWindow.Serializer(),
// setting .keyBy(InputPOJO::getKey)
keySelector,
BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
stateDesc,
// setting .process(new CustomWindowFunction(windowListStateTtlMillis))
new InternalIterableProcessWindowFunction<>(customWindowFunction),
// setting .trigger(new CustomTrigger(allowedLateness))
new CustomTrigger(secondsToMillis(allowedLatenessSeconds)),
0,
null);
// Creating testHarness for window operator
testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, keySelector, BasicTypeInfo.STRING_TYPE_INFO);
// Setup and Open Test Harness
testHarness.setup();
testHarness.open();
}
@Test
public void test_listStateTtl_exclusion() throws Exception {
int allowedLatenessSeconds = 3;
int listStateTTL = 10;
//1. Arrange
InputPOJO listStateInput1 = new InputPOJO(1,"Arjun");
InputPOJO listStateInput2 = new InputPOJO(2,"Arun");
// 2. Act
// listStateInput1 comes at 1 sec
testHarness.setProcessingTime(secondsToMillis(1));
testHarness.processElement(new StreamRecord<>(listStateInput1));
// Setting current processing time to 1 + 3 = 4 > allowedLateness.
// Window.process() is called, and window is purged (FIRE_AND_PURGE)
// Expectation: listStateInput1 is put into listState with TTL (10 secs), before process() ends.
testHarness.setProcessingTime(secondsToMillis(4));
// Setting processing time after listStateTTL, ie 4 + listStateTTL(10) + 1 = 15
// Expectation: listStateInput1 is evicted from the listState (Fails)
testHarness.setProcessingTime(secondsToMillis(15));
// Using sleep(), the listStateTTL is getting applied to listState and listStateInput1 is evicted (Pass)
//Thread.sleep(secondsToMillis(15))
//Passing listStateInput2 to the test Harness
testHarness.setProcessingTime(secondsToMillis(16));
testHarness.processElement(new StreamRecord<>(listStateInput2));
// Setting processing time after allowedLateness = 16 + 3 + 1 = 20
testHarness.setProcessingTime(secondsToMillis(20));
// 3. Assert
List<StreamRecord<? extends T>> streamRecords = testHarness.extractOutputStreamRecords();
// Expectation: streamRecords will only contain listStateInput2, since listStateInput1 was evicted.
// Actual: Getting both listStateInput1 & listStateInput2 in the output.
}
我注意到ttl不是通过设置处理时间来应用的。当我用thread.sleep(ttl)尝试相同的函数时,结果与预期一样。
liststate ttl是否使用系统时间进行逐出(使用testharness)?
有没有办法用testharness测试liststatettl?
暂无答案!
目前还没有任何答案,快来回答吧!