我想使用springkafkaapi实现一个有状态的监听器。
鉴于以下情况:
concurrentkafkalistenercontainerfactory,并发设置为“n”
spring@service类上的@kafkalistener注解方法
然后将创建“n”个kafkamessagelistenercontainers。其中每一个都有自己的kafkaconsumer,因此将有“n”个消费线程-每个消费线程一个。
使用消息时,将使用轮询底层kafkaconsumer的同一线程调用@kafkalistener方法。因为只有一个侦听器示例,所以这个侦听器需要是线程安全的,因为将有来自“n”个线程的并发访问。
我不想考虑并发访问,而是保持侦听器中的状态,我知道只有一个线程才能访问它。
如何使用SpringKafkaAPI为每个kafka使用者创建单独的侦听器?
1条答案
按热度按时间np8igboo1#
你是对的;每个容器都有一个侦听器示例(无论是否配置为
@KafkaListener
或者MessageListener
).一个解决方法是使用一个原型范围
MessageListener
带nKafkaMessageListenerContainer
豆子(每个有一根线)。然后,每个容器将获得自己的侦听器示例。
这是不可能的
@KafkaListener
pojo抽象。不过,使用无状态bean通常更好。
编辑
我找到了另一个使用
SimpleThreadScope
...(容器并发为3)。
它工作得很好:
唯一的问题是作用域无法自行清理(例如,当容器停止运行,线程消失时)。这可能并不重要,取决于您的用例。
要解决这个问题,我们需要容器提供一些帮助(例如,在侦听器线程停止时在其上发布事件)。gh-762。