我有这样的代码:
@Component
public class PostConsumer {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setConcurrency(10);
return factory;
}
@Autowired
private PostService postService;
@KafkaListener(topics = {"topic1", "topic2"} , containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {
postService.sendPost(consumerRecord.value());
}
}
如何并行处理分区主题1和分区主题2?
现在他们正在一个接一个地工作,首先处理topic1->partition0,然后处理topic2->partition0,然后再处理topic1->partition0,等等。
p、 当我把这个方法分成两个@kafkalistener方法时,它是并行工作的,但问题是在主题列表中,我提供了一个巨大的主题列表,不想有100+个@kafkalistener方法。
先谢谢你。
1条答案
按热度按时间ddrv8njm1#
您需要向侦听器容器添加并发性。但是,如果每个主题只有一个分区,那就没用了;在每个主题中需要足够的分区来支持所需的并发性,或者为每个主题使用单独的容器。
我们可以考虑另一种策略,可以选择将主题本身分发给消费者。
打开一个github问题,我们会考虑它。
编辑
您还可以更改默认分区赋值器,例如:
和
和