java—如何使用spring提供的kafka API在一个消费组中创建多个消费者

dba5bblo  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(1049)

我正在尝试在一个消费者组中创建多个消费者以进行并行处理,因为我们有大量的消息流入。我用的是Spring Boot和Kafka模板。如何在SpringBoot应用程序的单个示例中创建属于单个使用者组的多个使用者?使用@kafkalistener注解多个方法是否会创建多个使用者?

gfttwv5a

gfttwv5a1#

是的,那个 @KafkaListener 将为您创建多个消费者。
有了它,您可以将它们配置为使用同一主题并属于同一组。kafka协调器将把分区分发给用户。
尽管主题中只有一个分区,但并发不会发生:单个分区在单个线程中处理。
另一个选择是配置 concurrency 同样的,一些消费者将根据 concurrency <-> partition 州。

jm2pwxwz

jm2pwxwz2#

你必须使用 ConcurrentMessageListenerContainer . 它委托给一个或多个 KafkaMessageListenerContainer 示例提供多线程消费。

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(10);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
}

setconcurrency(10)创建10 KafkaMessageListenerContainer 示例。每个示例都获得一定数量的分区。这取决于创建主题时配置的分区数。
一些准备步骤:

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

private final static String BOOTSTRAP_ADDRESS = "localhost:9092";
private final static String CONSUMER_GROUP = "consumer-group-1";
private final static String TOPIC = "test-topic";

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@KafkaListener(topics = TOPIC, containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload String message) {
    logger.info(message);
}

public void start() {
    try {
        Thread.sleep(5000L);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }

    for (int i = 0; i < 10; i++) {
        kafkaTemplate.send(TOPIC, i, String.valueOf(i), "Message " + i);
    }

    logger.info("All message are sent");
}

如果运行上面的方法,您可以看到 KafkaMessageListenerContainer 示例处理放入该示例所服务的分区的消息。添加thread.sleep()以等待使用者初始化。

2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-4-C-1] r.s.c.KafkaConsumersDemo                 : Message 5
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-6-C-1] r.s.c.KafkaConsumersDemo                 : Message 7
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-7-C-1] r.s.c.KafkaConsumersDemo                 : Message 8
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-9-C-1] r.s.c.KafkaConsumersDemo                 : Message 1
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-0-C-1] r.s.c.KafkaConsumersDemo                 : Message 0
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-8-C-1] r.s.c.KafkaConsumersDemo                 : Message 9
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-3-C-1] r.s.c.KafkaConsumersDemo                 : Message 4
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-2-C-1] r.s.c.KafkaConsumersDemo                 : Message 3
2020-07-01 15:48:34.801  INFO 201566 --- [ntainer#0-1-C-1] r.s.c.KafkaConsumersDemo                 : Message 2
2020-07-01 15:48:34.800  INFO 201566 --- [ntainer#0-5-C-1] r.s.c.KafkaConsumersDemo                 : Message 6
wvyml7n5

wvyml7n53#

正如@salavat yalalo所建议的那样,我把我的Kafka集装箱工厂 ConcurrentKafkaListenerContainerFactory . 在@kafkalistenere方法中,我添加了一个名为concurrency的选项,它接受一个整数作为一个字符串,该字符串指示要跨越的使用者的数量,如下所示

@KafakListener(concurrency ="4", containerFactory="concurrentKafkaListenerContainerFactory(bean name of the factory)",..other optional values)
public void topicConsumer(Message<MyObject> myObject){
//.....
}

运行时,我看到在一个消费者组中创建了4个消费者。

相关问题