我使用的是spring kafka,如果我不设置concurrentkafkalistenercontainerfactory的并发性,当我将其设置为大于1的数字时,我会得到一个异常:
javax.management.instancealreadyexistsexception:Kafka。consumer:type=app-info,id=客户端3
我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(kafkaConfig.getConcurrency());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
属性:
kafka.enable-auto-commit=false
kafka.client-id=client-1
kafka.concurrency=2
1条答案
按热度按时间jucafojl1#
我在github上为此打开了一个问题。设置不同的
client.id
对于每个线程,当前都不支持。作为解决方案,您可以启动一个单独的
KafkaMessageListenerContainer
对于每个(这是ConcurrentMessageListenerContainer
在内部)。编辑
虽然不理想,但可以省略
client.id
Kafka客户端将为每个(consumer-1
,consumer-2
等)