Kafka 是否可以从ktable\kstream中获得前10名?

ljsrvy3e  于 2023-03-17  发布在  Apache
关注(0)|答案(1)|浏览(126)

我有一个主题,其中的String键是一个信号类型,Signal值是一个类,如下所示

public clas Signal {
  public final int deviceId;
  public final int value;
  ...
}

每个设备可以发送随时间上升或下降的信号值,而没有模式。
是否可以将每种类型(主题关键)在所有时间段内具有最大信号value的前10个设备作为KTable<String,Signal>?如果所有信号值都在上升,是否会有所帮助?
如果需要,可以更改主题结构。

n6lpvg4x

n6lpvg4x1#

例如,当值总是增加时,可以使用Kafka Streams。需要创建自己的Top10聚合,该聚合存储前10位并在add调用时更新它:

final var builder = new StreamsBuilder();
    final var topTable = builder
        .table(
            SignalChange.TOPIC_NAME,
            Consumed.with(Serdes.String(), new SignalChange.Serde())
        ).toStream()
        .groupByKey()
        .aggregate(
            () -> new Top10(),
            (k, v, top10) -> top10.add(v),
            Materialized.with(Serdes.String(), new Top10.Serde())
        );

topTable然后可以与请求顶部的任何流结合。

相关问题