我有一个程序,有多个访问共享资源的流。为了避免共享资源的竞争条件,我决定用一个共享的单线程执行器序列化流。我更喜欢用锁来困扰程序的这种方法——我的程序不会承受很重的负载,也不需要时间限制。
其中一个流从kafka读取消息,下面是该流的代码:
@Bean
public KafkaMessageListenerContainer kafkaFlow(DefaultConsumerFactory consumerFactory, ExecutorService myExecutor) {
ContainerProperties containerProperties = new ContainerProperties("myTopic");
... //more properties
containerProperties.setConsumerTaskExecutor(new ConcurrentTaskExecutor(myExecutor));
return new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
}
@Bean
public Executor myExecutor() {
return Executors.newSingleThreadScheduledExecutor();
}
问题是,虽然Kafka流运行良好,但其他流似乎从未获得运行的机会。为什么?我在其他jms容器中使用过这种方法,它们工作得很好。。。
提前谢谢。
1条答案
按热度按时间bzzcjhmw1#
Kafka要求消费者定期接受民意调查。否则将撤销主题/分区分配;容器为每个使用者使用一个专用线程。
你需要
poll()
如果您希望为其他目的共享线程,那么使用者可以自己而不是使用侦听器容器,但是您仍然需要确保poll()
内max.poll.interval.ms
为了避免Kafka认为你的消费者已经死了。