java—如何使用SpringKafka实现有状态消息侦听器?

o2rvlv0m  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(790)

我想使用springkafkaapi实现一个有状态的监听器。
鉴于以下情况:
concurrentkafkalistenercontainerfactory,并发设置为“n”
spring@service类上的@kafkalistener注解方法
然后将创建“n”个kafkamessagelistenercontainers。其中每一个都有自己的kafkaconsumer,因此将有“n”个消费线程-每个消费线程一个。
使用消息时,将使用轮询底层kafkaconsumer的同一线程调用@kafkalistener方法。因为只有一个侦听器示例,所以这个侦听器需要是线程安全的,因为将有来自“n”个线程的并发访问。
我不想考虑并发访问,而是保持侦听器中的状态,我知道只有一个线程才能访问它。
如何使用SpringKafkaAPI为每个kafka使用者创建单独的侦听器?

np8igboo

np8igboo1#

你是对的;每个容器都有一个侦听器示例(无论是否配置为 @KafkaListener 或者 MessageListener ).
一个解决方法是使用一个原型范围 MessageListener 带n KafkaMessageListenerContainer 豆子(每个有一根线)。
然后,每个容器将获得自己的侦听器示例。
这是不可能的 @KafkaListener pojo抽象。
不过,使用无状态bean通常更好。
编辑
我找到了另一个使用 SimpleThreadScope ...

@SpringBootApplication
public class So51658210Application {

    public static void main(String[] args) {
        SpringApplication.run(So51658210Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, ApplicationContext context,
            KafkaListenerEndpointRegistry registry) {
        return args -> {
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
            template.send("so51658210", 0, "", "foo");
            template.send("so51658210", 1, "", "bar");
            template.send("so51658210", 2, "", "baz");
        };
    }

    @Bean
    public ActualListener actualListener() {
        return new ActualListener();
    }

    @Bean
    @Scope("threadScope")
    public ThreadScopedListener listener() {
        return new ThreadScopedListener();
    }

    @Bean
    public static CustomScopeConfigurer scoper() {
        CustomScopeConfigurer configurer = new CustomScopeConfigurer();
        configurer.addScope("threadScope", new SimpleThreadScope());
        return configurer;
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so51658210", 3, (short) 1);
    }

    public static class ActualListener {

        @Autowired
        private ObjectFactory<ThreadScopedListener> listener;

        @KafkaListener(id = "foo", topics = "so51658210")
        public void listen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            this.listener.getObject().doListen(in, partition);
        }

    }

    public static class ThreadScopedListener {

        private void doListen(String in, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println(in + ":"
                    + Thread.currentThread().getName() + ":"
                    + this.hashCode() + ":"
                    + partition);
        }

    }

}

(容器并发为3)。
它工作得很好:

bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2
bar:foo-1-C-1:1678357802:1
foo:foo-0-C-1:1973858124:0
baz:foo-2-C-1:331135828:2

唯一的问题是作用域无法自行清理(例如,当容器停止运行,线程消失时)。这可能并不重要,取决于您的用例。
要解决这个问题,我们需要容器提供一些帮助(例如,在侦听器线程停止时在其上发布事件)。gh-762。

相关问题