考虑一下KStream:
KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()));
Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE);
KStream<Windowed<String>, List<String>> outputStream = inputStream
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
.aggregate(
ArrayList::new,
(key, string, aggregate) -> {
aggregate.add(string);
return aggregate;
}, with)
.toStream();
字符串outputStream
将把来自inputStream
info的所有消息聚合成一个消息数组,用于定义的时间范围。现在,另外,我想将消息聚合到一个特定的限制,比如直到列表的大小不超过50。如果列表在聚合过程中变得大于50,我想以某种方式将其拆分为附加列表。
基本上,我希望实现的输出是获得一个消息数组,其大小达到一定的限制(例如50),并且达到特定的时间范围,以先到者为准。
为了实现这一点,我在这里错过了什么?
谢谢你,谢谢
1条答案
按热度按时间bf1o4zei1#
您可以尝试将
KTable
转换为KStream
,然后执行flatMapValues
来拆分列表,类似于以下内容(Kotlin中的代码):字符串
然而,这意味着你要把整个聚合列表加载到内存中--这可能是一个问题,也可能不是,这取决于列表的大小和你的内存设置,但这绝对是要记住的。