我正在尝试让我的应用程序与多个Kafka集群工作。
两个kafka集群都以docker-compose.yml文件开始
version: '3'
services:
zookeeper1:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper1/data:/data
- ./data/zookeeper1/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9091:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper1
kafdrop1:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9100:9000"
environment:
KAFKA_BROKERCONNECT: "kafka1:19091"
depends_on:
- kafka1
zookeeper2:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "3181:3181"
environment:
ZOO_MY_ID: 2
ZOO_PORT: 3181
ZOO_SERVERS: server.1=zookeeper:2888:3888
volumes:
- ./data/zookeeper2/data:/data
- ./data/zookeeper2/datalog:/datalog
kafka2:
image: confluentinc/cp-kafka:5.3.0
hostname: kafka1
ports:
- "9191:9091"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9191
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper2:3181"
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./data/kafka2/data:/var/lib/kafka/data
depends_on:
- zookeeper2
kafdrop2:
image: obsidiandynamics/kafdrop
restart: "no"
ports:
- "9200:9000"
environment:
KAFKA_BROKERCONNECT: "kafka2:19091"
depends_on:
- kafka2
然后在kafka1中创建“第一”主题,在second中创建“第二”主题。在那之后,Kafka准备好消费信息了。
我为第一个Kafka创建了两个独立的java配置:
@EnableKafka
@AutoConfigureBefore(KafkaAutoConfiguration.class)
@Configuration
class FirstKafkaConfig {
@Bean("firstKafkaConsumerFactory")
@ConditionalOnMissingBean
public ConsumerFactory<String, String> firstConsumerFactory() {
Map<String, String> consumerProperties = getCommonProperties();
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory(consumerProperties);
}
private Map<String, String> getCommonProperties() {
Map<String, String> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group-1");
return consumerProperties;
}
@Bean("firstKafkaContainerFactory")
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactory<String, String> firstKafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(firstConsumerFactory());
factory.setConcurrency(5);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean("firstKafkaProducerFactory")
@ConditionalOnMissingBean
public ProducerFactory<String, String> firstProducerFactory() {
Map<String, String> producerProperties = getCommonProperties();
StringSerializer serializer = new StringSerializer();
return new DefaultKafkaProducerFactory(producerProperties, serializer, serializer);
}
@Bean("firstKafkaTemplate")
@ConditionalOnMissingBean
public KafkaTemplate<String, String> firstKafkaTemplate() {
return new KafkaTemplate<>(firstProducerFactory());
}
}
对于第二个问题:
@EnableKafka
@AutoConfigureBefore(KafkaAutoConfiguration.class)
@AutoConfigureAfter(FirstKafkaConfig.class )
@Configuration
public class SecondKafkaConfig {
@Bean("secondKafkaConsumerFactory")
@ConditionalOnMissingBean
public ConsumerFactory<String, String> secondConsumerFactory() {
Map<String, String> consumerProperties = getCommonProperties();
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory(consumerProperties);
}
private Map<String, String> getCommonProperties() {
Map<String, String> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9191");
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group-2");
return consumerProperties;
}
@Bean("secondKafkaContainerFactory")
@ConditionalOnMissingBean
public ConcurrentKafkaListenerContainerFactory<String, String> secondKafkaContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(secondConsumerFactory());
factory.setConcurrency(5);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean("secondKafkaProducerFactory")
@ConditionalOnMissingBean
public ProducerFactory<String, String> secondProducerFactory() {
Map<String, String> producerProperties = getCommonProperties();
StringSerializer serializer = new StringSerializer();
return new DefaultKafkaProducerFactory(producerProperties, serializer, serializer);
}
@Bean("secondKafkaTemplate")
@ConditionalOnMissingBean
public KafkaTemplate<String, String> secondKafkaTemplate() {
return new KafkaTemplate<>(secondProducerFactory());
}
}
最后,我配置了侦听器:
@Slf4j
@Service
public class Consumer {
@KafkaListener(topics = "first", groupId = "custom-group-1", containerFactory = "firstKafkaContainerFactory")
public void consumeFirst(String message) throws IOException {
log.info(String.format("#### -> Consumed message -> %s", message));
}
@KafkaListener(topics = "second", groupId = "custom-group-2", containerFactory = "secondKafkaContainerFactory")
public void consumeSecond(String message) throws IOException {
log.info(String.format("#### -> Consumed message -> %s", message));
}
}
我尽一切努力将第一个kafka bean与第二个分开,甚至在第一个之后加载第二个配置,但当我启动应用程序时,我没有得到任何suchbeandefinitionexception:
***************************
APPLICATION FAILED TO START
***************************
Description:
A component required a bean named 'secondKafkaContainerFactory' that could not be found.
The following candidates were found but could not be injected:
- Bean method 'secondKafkaContainerFactory' in 'SecondKafkaConfig' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory' firstKafkaContainerFactory
Action:
Consider revisiting the entries above or defining a bean named 'secondKafkaContainerFactory' in your configuration.
我做错什么了?
暂无答案!
目前还没有任何答案,快来回答吧!