kafka streams在从changelog主题还原时会释放窗口

qlzsbp2j  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(285)

我有一个自定义的valuetransformer,它在下一个窗口中有另一个具有相同键的消息时过滤消息。
例如:

|------Window1------||------Window2------|
|----------(k1,v1)--||---(k1,v2)---------|

仅发射(k1,v2)。
在应用程序重新启动并从changelog还原statestore之前,它可以正常工作。
重启后结果为:(k1,v1),(k2,v2)
变压器代号:

public class LastWindowValueTransformer implements ValueTransformerWithKeySupplier<Windowed<String>, RequestAvro, RequestAvro> {

    private long windowTimeoutSeconds = 300;

    public LastWindowValueTransformer(Reducer<RequestAvro> reducer) {
        this.reducer = reducer;
    }

    @Override
    public ValueTransformerWithKey<Windowed<String>, RequestAvro, RequestAvro> get() {
        return new ValueTransformerWithKey<>() {
            private WindowStore<String, RequestAvro> reduceResultsStore;

            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
                reduceResultsStore = (WindowStore<String, RequestAvro>) context.getStateStore("REDUCE-RESULTS");
            }

            @Override
            public RequestAvro transform(Windowed<String> readOnlyKey, RequestAvro value) {
                Instant windowEndTime = readOnlyKey.window().endTime();

                WindowStoreIterator<RequestAvro> iterator = reduceResultsStore
                        .fetch(
                                readOnlyKey.key(),
                                windowEndTime,
                                windowEndTime.plus(Duration.ofSeconds(windowTimeoutSeconds))
                        );

                boolean hasKeyInNextWindow = iterator.hasNext();
                iterator.close();

                if (hasKeyInNextWindow) {
                    log.trace("key has value in next window. return null. key: {}", readOnlyKey.key());
                    return null;
                } else {
                    return

                            value;
                }
            }

            @Override
            public void close() {
                log.trace("closing processor...");
            }
        };
    }
}

dsl代码如下:

.groupByKey()
.windowedBy(
        TimeWindows
                .of(Duration.ofSeconds(300))
                .grace(Duration.ofSeconds(300))
                .advanceBy(Duration.ofSeconds(300))

)
.reduce(
        reducer,
        Materialized
                .<String, PprbZrServiceRequestAvro, WindowStore<Bytes, byte[]>>as("REDUCE-RESULTS")
                .withRetention(Duration.ofSeconds(600))
                .withKeySerde(new Serdes.StringSerde())
)
.suppress(
        untilWindowCloses(Suppressed.BufferConfig.unbounded())
                .withName("SUPPRESS-INTERMEDIATE-RESULTS")
)
.transformValues(
        lastWindowValueTransformer,
        "REDUCE-RESULTS")
.toStream(
        (k, v) -> {
            log.trace("key: {} Window start: {}, end: {}", k.key(), LocalDateTime.ofInstant(k.window().startTime(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME), LocalDateTime.ofInstant(k.window().endTime(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME));
            return k.key();
        }
)
.filter((k, v) -> {
    if (v == null) {
        log.trace("Message with key: {} has null value", k);
        return false;
    } else {
        return true;
    }
})

问题是,当应用程序重新启动并从chagelog还原statestore时,此代码不会返回任何结果:

reduceResultsStore
                        .fetch(
                                readOnlyKey.key(),
                                windowEndTime,
                                windowEndTime.plus(Duration.ofSeconds(windowTimeoutSeconds))
                        );

为什么? .fetch 不从下一个窗口返回消息?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题