spring 如何在特定时间段内将KStream聚合到固定大小的列表?

xggvc2p6  于 2023-08-02  发布在  Spring
关注(0)|答案(1)|浏览(74)

考虑一下KStream:

KStream<String, String> inputStream = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()));
Materialized<String, List<String>, WindowStore<Bytes, byte[]>> with = Materialized.with(Serdes.String(), STRING_LIST_SERDE);

KStream<Windowed<String>, List<String>> outputStream = inputStream
            .groupByKey()
            .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
            .aggregate(
                    ArrayList::new,
                    (key, string, aggregate) -> {
                        aggregate.add(string);
                        return aggregate;
                    }, with)
            .toStream();

字符串
outputStream将把来自inputStream info的所有消息聚合成一个消息数组,用于定义的时间范围。现在,另外,我想将消息聚合到一个特定的限制,比如直到列表的大小不超过50。如果列表在聚合过程中变得大于50,我想以某种方式将其拆分为附加列表。
基本上,我希望实现的输出是获得一个消息数组,其大小达到一定的限制(例如50),并且达到特定的时间范围,以先到者为准。
为了实现这一点,我在这里错过了什么?
谢谢你,谢谢

bf1o4zei

bf1o4zei1#

您可以尝试将KTable转换为KStream,然后执行flatMapValues来拆分列表,类似于以下内容(Kotlin中的代码):

val s = streamsBuilder.stream("kafka-topic", Consumed.with(Serdes.String(), Serdes.String()))
val output = s
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofSeconds(2)))
    .aggregate({ mutableListOf() }, { k: String, str: String, agg: List<String> -> agg.plus(str) })
    .toStream()
    .flatMapValues { strList -> strList.chunked(50) }

字符串
然而,这意味着你要把整个聚合列表加载到内存中--这可能是一个问题,也可能不是,这取决于列表的大小和你的内存设置,但这绝对是要记住的。

相关问题