我有一个自定义的valuetransformer,它在下一个窗口中有另一个具有相同键的消息时过滤消息。
例如:
|------Window1------||------Window2------|
|----------(k1,v1)--||---(k1,v2)---------|
仅发射(k1,v2)。
在应用程序重新启动并从changelog还原statestore之前,它可以正常工作。
重启后结果为:(k1,v1),(k2,v2)
变压器代号:
public class LastWindowValueTransformer implements ValueTransformerWithKeySupplier<Windowed<String>, RequestAvro, RequestAvro> {
private long windowTimeoutSeconds = 300;
public LastWindowValueTransformer(Reducer<RequestAvro> reducer) {
this.reducer = reducer;
}
@Override
public ValueTransformerWithKey<Windowed<String>, RequestAvro, RequestAvro> get() {
return new ValueTransformerWithKey<>() {
private WindowStore<String, RequestAvro> reduceResultsStore;
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
reduceResultsStore = (WindowStore<String, RequestAvro>) context.getStateStore("REDUCE-RESULTS");
}
@Override
public RequestAvro transform(Windowed<String> readOnlyKey, RequestAvro value) {
Instant windowEndTime = readOnlyKey.window().endTime();
WindowStoreIterator<RequestAvro> iterator = reduceResultsStore
.fetch(
readOnlyKey.key(),
windowEndTime,
windowEndTime.plus(Duration.ofSeconds(windowTimeoutSeconds))
);
boolean hasKeyInNextWindow = iterator.hasNext();
iterator.close();
if (hasKeyInNextWindow) {
log.trace("key has value in next window. return null. key: {}", readOnlyKey.key());
return null;
} else {
return
value;
}
}
@Override
public void close() {
log.trace("closing processor...");
}
};
}
}
dsl代码如下:
.groupByKey()
.windowedBy(
TimeWindows
.of(Duration.ofSeconds(300))
.grace(Duration.ofSeconds(300))
.advanceBy(Duration.ofSeconds(300))
)
.reduce(
reducer,
Materialized
.<String, PprbZrServiceRequestAvro, WindowStore<Bytes, byte[]>>as("REDUCE-RESULTS")
.withRetention(Duration.ofSeconds(600))
.withKeySerde(new Serdes.StringSerde())
)
.suppress(
untilWindowCloses(Suppressed.BufferConfig.unbounded())
.withName("SUPPRESS-INTERMEDIATE-RESULTS")
)
.transformValues(
lastWindowValueTransformer,
"REDUCE-RESULTS")
.toStream(
(k, v) -> {
log.trace("key: {} Window start: {}, end: {}", k.key(), LocalDateTime.ofInstant(k.window().startTime(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME), LocalDateTime.ofInstant(k.window().endTime(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME));
return k.key();
}
)
.filter((k, v) -> {
if (v == null) {
log.trace("Message with key: {} has null value", k);
return false;
} else {
return true;
}
})
问题是,当应用程序重新启动并从chagelog还原statestore时,此代码不会返回任何结果:
reduceResultsStore
.fetch(
readOnlyKey.key(),
windowEndTime,
windowEndTime.plus(Duration.ofSeconds(windowTimeoutSeconds))
);
为什么? .fetch
不从下一个窗口返回消息?
暂无答案!
目前还没有任何答案,快来回答吧!