concurrentmessagelistenercontainer未执行并发操作

8e2ybdfx  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(546)

我试图创建一个多线程侦听器,但是所有消息都在同一个线程中执行。运行时,线程id总是相同的,即使kafkalistercontainerfactory(正确地)是我示例化的那个。如果我几乎同时发送7条消息,我会期望前三条消息同时处理,然后下三条消息同时处理,最后一条消息同时处理。我看到的是,第一个过程是完成,然后是第二个,然后是第三个,等等。我是误解了什么,还是只是误解了什么?
这是我的听众:

@Component
public class ExampleKafkaController {
    Log log = Log.getLog(ExampleKafkaController.class);

    @Autowired
    private KafkaListenerContainerFactory kafkaListenerContainerFactory;

    @KafkaListener(topics = "${kafka.example.topic}")
    public void listenForMessage(ConsumerRecord<?, ?> record) {
        log.info("Got record:\n" + record.value());
        System.out.println("Kafka Thread: " + Thread.currentThread());
        System.out.println(kafkaListenerContainerFactory);

        log.info("Waiting...");
        waitSleep(10000);

        log.info("Done!");
    }

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.example.topic}")
    public String topic;    

    public void send(String payload) {
        log.info("sending payload='" + payload + "' to topic='" + topic + "'");
        kafkaTemplate.send(topic, payload);
    }

    private void waitSleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

这是我使用新配置的应用程序:

@SpringBootApplication
@ComponentScan("net.reigrut.internet.services.example.*")
@EntityScan("net.reigrut.internet.services.example.*")
@EnableKafka
@Configuration
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Autowired 
    ConsumerFactory<Integer,String> consumerFactory;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(3);
        System.out.println("===========>" + consumerFactory);
        System.out.println(factory);
        return factory;
    }
}
qzlgjiam

qzlgjiam1#

对于kafka,并发性仅限于主题中的分区数。如果只有一个分区,那么无论容器的并发设置如何,您都只能在一个线程上接收消息。
您应该将分区数设置为大于或等于所需的并发性。如果分区数大于并发数,则分区将分布在使用者线程中。
一个组中只有一个使用者可以使用分区中的数据。

相关问题