我正在尝试在一个消费者组中创建多个消费者以进行并行处理,因为我们有大量的消息流入。我用的是Spring Boot和Kafka模板。如何在SpringBoot应用程序的单个示例中创建属于单个使用者组的多个使用者?使用@kafkalistener注解多个方法是否会创建多个使用者?
gfttwv5a1#
是的,那个 @KafkaListener 将为您创建多个消费者。有了它,您可以将它们配置为使用同一主题并属于同一组。kafka协调器将把分区分发给用户。尽管主题中只有一个分区,但并发不会发生:单个分区在单个线程中处理。另一个选择是配置 concurrency 同样的,一些消费者将根据 concurrency <-> partition 州。
@KafkaListener
concurrency
concurrency <-> partition
jm2pwxwz2#
你必须使用 ConcurrentMessageListenerContainer . 它委托给一个或多个 KafkaMessageListenerContainer 示例提供多线程消费。
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
@Bean KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(10); factory.getContainerProperties().setPollTimeout(3000); return factory; }
setconcurrency(10)创建10 KafkaMessageListenerContainer 示例。每个示例都获得一定数量的分区。这取决于创建主题时配置的分区数。一些准备步骤:
@Autowired private KafkaTemplate<String, Object> kafkaTemplate; private final static String BOOTSTRAP_ADDRESS = "localhost:9092"; private final static String CONSUMER_GROUP = "consumer-group-1"; private final static String TOPIC = "test-topic"; @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS); props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory") public void listen(@Payload String message) { logger.info(message); } public void start() { try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } for (int i = 0; i < 10; i++) { kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i); } logger.info("All message are sent"); }
如果运行上面的方法,您可以看到 KafkaMessageListenerContainer 示例处理放入该示例所服务的分区的消息。添加thread.sleep()以等待使用者初始化。
2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo : Message 5 2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo : Message 7 2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo : Message 8 2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo : Message 1 2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo : Message 0 2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo : Message 9 2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo : Message 4 2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo : Message 3 2020-07-01 15:48:34.801 INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo : Message 2 2020-07-01 15:48:34.800 INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo : Message 6
wvyml7n53#
正如@salavat yalalo所建议的那样,我把我的Kafka集装箱工厂 ConcurrentKafkaListenerContainerFactory . 在@kafkalistenere方法中,我添加了一个名为concurrency的选项,它接受一个整数作为一个字符串,该字符串指示要跨越的使用者的数量,如下所示
ConcurrentKafkaListenerContainerFactory
@KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values) public void topicConsumer(Message<MyObject> myObject){ //..... }
运行时,我看到在一个消费者组中创建了4个消费者。
3条答案
按热度按时间gfttwv5a1#
是的,那个
@KafkaListener
将为您创建多个消费者。有了它,您可以将它们配置为使用同一主题并属于同一组。kafka协调器将把分区分发给用户。
尽管主题中只有一个分区,但并发不会发生:单个分区在单个线程中处理。
另一个选择是配置
concurrency
同样的,一些消费者将根据concurrency <-> partition
州。jm2pwxwz2#
你必须使用
ConcurrentMessageListenerContainer
. 它委托给一个或多个KafkaMessageListenerContainer
示例提供多线程消费。setconcurrency(10)创建10
KafkaMessageListenerContainer
示例。每个示例都获得一定数量的分区。这取决于创建主题时配置的分区数。一些准备步骤:
如果运行上面的方法,您可以看到
KafkaMessageListenerContainer
示例处理放入该示例所服务的分区的消息。添加thread.sleep()以等待使用者初始化。wvyml7n53#
正如@salavat yalalo所建议的那样,我把我的Kafka集装箱工厂
ConcurrentKafkaListenerContainerFactory
. 在@kafkalistenere方法中,我添加了一个名为concurrency的选项,它接受一个整数作为一个字符串,该字符串指示要跨越的使用者的数量,如下所示运行时,我看到在一个消费者组中创建了4个消费者。