如何并行化kafka消息消费

u4vypkhs  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(423)

我有这样的代码:

@Component
public class PostConsumer {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =           new ConcurrentKafkaListenerContainerFactory<>();
        ...
        factory.setConcurrency(10);    
        return factory;
    }

    @Autowired
    private PostService postService;

    @KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
    public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

        postService.sendPost(consumerRecord.value());

    }
}

如何并行处理分区主题1和分区主题2?
现在他们正在一个接一个地工作,首先处理topic1->partition0,然后处理topic2->partition0,然后再处理topic1->partition0,等等。
p、 当我把这个方法分成两个@kafkalistener方法时,它是并行工作的,但问题是在主题列表中,我提供了一个巨大的主题列表,不想有100+个@kafkalistener方法。
先谢谢你。

ddrv8njm

ddrv8njm1#

您需要向侦听器容器添加并发性。但是,如果每个主题只有一个分区,那就没用了;在每个主题中需要足够的分区来支持所需的并发性,或者为每个主题使用单独的容器。
我们可以考虑另一种策略,可以选择将主题本身分发给消费者。
打开一个github问题,我们会考虑它。
编辑
您还可以更改默认分区赋值器,例如:

spring.kafka.consumer.properties.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

@SpringBootApplication
public class Kgh941Application {

    public static void main(String[] args) {
        SpringApplication.run(Kgh941Application.class, args);
    }

    @KafkaListener(id = "kgh941", concurrency = "30",
            topics = {"kgh941a", "kgh941b", "kgh941c", "kgh941d", "kgh941e", "kgh941f", "kgh941g", "kgh941h"})
    public void listen(String in) {

    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry) {
        return args -> {
            System.out.println("Hit enter to get assignments");
            System.in.read();
            MessageListenerContainer container = registry.getListenerContainer("kgh941");
            @SuppressWarnings("unchecked")
            List<KafkaMessageListenerContainer<?, ?>> containers = (List<KafkaMessageListenerContainer<?, ?>>) new DirectFieldAccessor(
                    container).getPropertyValue("containers");
            containers.forEach(c -> System.out.println(c.getAssignedPartitions().size() + ":"
                    + c.getAssignedPartitions()));
        };
    }

    @Bean
    public NewTopic topica() {
        return new NewTopic("kgh941a", 10, (short) 1);
    }

    @Bean
    public NewTopic topicb() {
        return new NewTopic("kgh941b", 10, (short) 1);
    }

    @Bean
    public NewTopic topicc() {
        return new NewTopic("kgh941c", 10, (short) 1);
    }

    @Bean
    public NewTopic topicd() {
        return new NewTopic("kgh941d", 10, (short) 1);
    }

    @Bean
    public NewTopic topice() {
        return new NewTopic("kgh941e", 10, (short) 1);
    }

    @Bean
    public NewTopic topicf() {
        return new NewTopic("kgh941f", 10, (short) 1);
    }

    @Bean
    public NewTopic topicg() {
        return new NewTopic("kgh941g", 10, (short) 1);
    }

    @Bean
    public NewTopic topich() {
        return new NewTopic("kgh941h", 10, (short) 1);
    }

}

3:[kgh941b-0, kgh941h-0, kgh941e-0]
2:[kgh941c-1, kgh941f-1]
2:[kgh941c-4, kgh941f-4]
2:[kgh941c-5, kgh941f-5]
2:[kgh941f-6, kgh941c-6]
2:[kgh941c-7, kgh941f-7]
2:[kgh941c-8, kgh941f-8]
2:[kgh941c-9, kgh941f-9]
3:[kgh941d-0, kgh941a-0, kgh941g-0]
3:[kgh941a-1, kgh941d-1, kgh941g-1]
3:[kgh941a-2, kgh941d-2, kgh941g-2]
3:[kgh941a-3, kgh941g-3, kgh941d-3]
3:[kgh941d-4, kgh941a-4, kgh941g-4]
3:[kgh941a-5, kgh941d-5, kgh941g-5]
3:[kgh941a-6, kgh941d-6, kgh941g-6]
3:[kgh941a-7, kgh941g-7, kgh941d-7]
3:[kgh941d-8, kgh941a-8, kgh941g-8]
3:[kgh941d-9, kgh941g-9, kgh941a-9]
3:[kgh941e-1, kgh941b-1, kgh941h-1]
3:[kgh941b-2, kgh941e-2, kgh941h-2]
3:[kgh941b-3, kgh941e-3, kgh941h-3]
3:[kgh941b-4, kgh941h-4, kgh941e-4]
3:[kgh941e-5, kgh941b-5, kgh941h-5]
3:[kgh941b-6, kgh941e-6, kgh941h-6]
3:[kgh941b-7, kgh941e-7, kgh941h-7]
3:[kgh941b-8, kgh941h-8, kgh941e-8]
3:[kgh941e-9, kgh941b-9, kgh941h-9]
2:[kgh941c-0, kgh941f-0]
2:[kgh941f-2, kgh941c-2]
2:[kgh941c-3, kgh941f-3]

相关问题