java—使用kafka流获取时间窗口中给定键的最后一个事件

63lcw9qa  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(440)

这个问题在这里已经有答案了

如何发送时间窗口ktable的最终kafka流聚合结果(3个答案)
去年关门了。
我开始使用kstream来使用现有主题中的数据。
我只想在10秒内获得给定id的最后一个事件。我尝试使用以下代码:

StreamsBuilder builder = new StreamsBuilder();
KStream<String, MySale> stream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), specificAvroSerde));

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));

但我最终得到了所有的事件,而不仅仅是最后一个。是否可以使用kstream执行我想要的操作?

avwztpqn

avwztpqn1#

使用 .suppress() 它抑制窗口中的所有中间结果,并仅发射最终结果。

stream.selectKey((key, value) -> value.getID())
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10)))
    .reduce((value1, value2) -> value2)
    .suppress(Suppressed.untilWindowCloses(unbounded())))  // like this
    .toStream()
    .peek((key, value) -> log.info("key={}, value={}", key, value.toString()))
    .to(OUTPUT_TOPIC, Produced.with(Serdes.String(), specificAvroSerde));

您可以在此处阅读更多内容:https://kafka.apache.org/21/documentation/streams/developer-guide/dsl-api.html#window-最终结果

相关问题