SpringKafka在主题中查找最新的可用消息

t5zmwmid  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我有一个Kafka消费者订阅以下主题 MY_TOPIC , MY_UNINTERESTED_TOPIC .
在下面的场景中,我对第二个主题不感兴趣,但是我不得不提到它,因为如果我使用 auto.offset.reset 它可能会影响所有的主题。
MY_TOPIC 主题一发布不同类型的消息: MESSAGE_TYPE_A 以及 MESSAGE_TYPE_B . 这两条消息都是 BaseKafkaMessage (自定义类)具有不同的属性。
现在我只想找到类型为message\u type\u a的最新消息。我该怎么做?
真正的场景是这样的:我在同一主题上发布两种类型的消息。其中一个用于在对该主题和该消息感兴趣的每个使用者中准备本地缓存。如果使用者停止,则当它重新加载时,必须使用最新的 MESSAGE_TYPE_A . MESSAGE_TYPE_B 应该忽略。我不想在kafka上向数据提供者发送通知以再次发布数据,因为所有订阅者都将有许多不必要的工作要做。
我怎样才能得到这个?这可能吗?
我发现https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek 但我不确定这是否是我正在寻找的,或者是否有其他方法可以做到这一点。

2guxujil

2guxujil1#

现在还不清楚这些信息是什么格式的,或者为什么你真的需要它们在同一个主题中。
例如,可以使用不同的avro类型。否则你就得 try-catch 解析两个不同的字节数组(json对象?)
或者可以按类型对主题进行分区,使同一类型的所有消息按相同分区的顺序排列。
但是,没有任何机制可以执行索引查找来获取最近发送的消息。要么从最新的事件开始,得到下一条传入消息,要么从头开始,然后向上扫描,直到在下一个轮询循环中得到0条记录,理论上这是“最近的”
类似auto.offset.reset的内容可能会影响所有主题
这只会影响消费者感兴趣的主题,而不是所有主题。

相关问题