多个kafka容器工厂bean异常

dm7nw8vv  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(220)

我正在尝试让我的应用程序与多个Kafka集群工作。
两个kafka集群都以docker-compose.yml文件开始

version: '3'
services:
  zookeeper1:
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_PORT: 2181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper1/data:/data
      - ./data/zookeeper1/datalog:/datalog
  kafka1:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    ports:
      - "9091:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9091
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper1:2181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka1/data:/var/lib/kafka/data
    depends_on:
      - zookeeper1
  kafdrop1:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9100:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka1:19091"
    depends_on:
      - kafka1
  zookeeper2:
    image: zookeeper:3.4.9
    hostname: zookeeper
    ports:
      - "3181:3181"
    environment:
      ZOO_MY_ID: 2
      ZOO_PORT: 3181
      ZOO_SERVERS: server.1=zookeeper:2888:3888
    volumes:
      - ./data/zookeeper2/data:/data
      - ./data/zookeeper2/datalog:/datalog
  kafka2:
    image: confluentinc/cp-kafka:5.3.0
    hostname: kafka1
    ports:
      - "9191:9091"
    environment:
      KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka2:19091,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9191
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper2:3181"
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
    volumes:
      - ./data/kafka2/data:/var/lib/kafka/data
    depends_on:
      - zookeeper2
  kafdrop2:
    image: obsidiandynamics/kafdrop
    restart: "no"
    ports:
      - "9200:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka2:19091"
    depends_on:
      - kafka2

然后在kafka1中创建“第一”主题,在second中创建“第二”主题。在那之后,Kafka准备好消费信息了。
我为第一个Kafka创建了两个独立的java配置:

@EnableKafka
@AutoConfigureBefore(KafkaAutoConfiguration.class)
@Configuration
class FirstKafkaConfig {

    @Bean("firstKafkaConsumerFactory")
    @ConditionalOnMissingBean
    public ConsumerFactory<String, String> firstConsumerFactory() {
        Map<String, String> consumerProperties = getCommonProperties();
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory(consumerProperties);
    }

    private Map<String, String> getCommonProperties() {
        Map<String, String> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group-1");
        return consumerProperties;
    }

    @Bean("firstKafkaContainerFactory")
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactory<String, String> firstKafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(firstConsumerFactory());
        factory.setConcurrency(5);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean("firstKafkaProducerFactory")
    @ConditionalOnMissingBean
    public ProducerFactory<String, String> firstProducerFactory() {
        Map<String, String> producerProperties = getCommonProperties();
        StringSerializer serializer = new StringSerializer();
        return new DefaultKafkaProducerFactory(producerProperties, serializer, serializer);
    }

    @Bean("firstKafkaTemplate")
    @ConditionalOnMissingBean
    public KafkaTemplate<String, String> firstKafkaTemplate() {
        return new KafkaTemplate<>(firstProducerFactory());
    }
}

对于第二个问题:

@EnableKafka
@AutoConfigureBefore(KafkaAutoConfiguration.class)
@AutoConfigureAfter(FirstKafkaConfig.class )
@Configuration
public class SecondKafkaConfig {

    @Bean("secondKafkaConsumerFactory")
    @ConditionalOnMissingBean
    public ConsumerFactory<String, String> secondConsumerFactory() {
        Map<String, String> consumerProperties = getCommonProperties();
        consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new DefaultKafkaConsumerFactory(consumerProperties);
    }

    private Map<String, String> getCommonProperties() {
        Map<String, String> consumerProperties = new HashMap<>();
        consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9191");
        consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "custom-group-2");
        return consumerProperties;
    }

    @Bean("secondKafkaContainerFactory")
    @ConditionalOnMissingBean
    public ConcurrentKafkaListenerContainerFactory<String, String> secondKafkaContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
        factory.setConsumerFactory(secondConsumerFactory());
        factory.setConcurrency(5);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

    @Bean("secondKafkaProducerFactory")
    @ConditionalOnMissingBean
    public ProducerFactory<String, String> secondProducerFactory() {
        Map<String, String> producerProperties = getCommonProperties();
        StringSerializer serializer = new StringSerializer();
        return new DefaultKafkaProducerFactory(producerProperties, serializer, serializer);
    }

    @Bean("secondKafkaTemplate")
    @ConditionalOnMissingBean
    public KafkaTemplate<String, String> secondKafkaTemplate() {
        return new KafkaTemplate<>(secondProducerFactory());
    }
}

最后,我配置了侦听器:

@Slf4j
@Service
public class Consumer {

    @KafkaListener(topics = "first", groupId = "custom-group-1", containerFactory = "firstKafkaContainerFactory")
    public void consumeFirst(String message) throws IOException {
        log.info(String.format("#### -> Consumed message -> %s", message));
    }

    @KafkaListener(topics = "second", groupId = "custom-group-2", containerFactory = "secondKafkaContainerFactory")
    public void consumeSecond(String message) throws IOException {
        log.info(String.format("#### -> Consumed message -> %s", message));
    }
}

我尽一切努力将第一个kafka bean与第二个分开,甚至在第一个之后加载第二个配置,但当我启动应用程序时,我没有得到任何suchbeandefinitionexception:


***************************

APPLICATION FAILED TO START

***************************

Description:

A component required a bean named 'secondKafkaContainerFactory' that could not be found.

The following candidates were found but could not be injected:
    - Bean method 'secondKafkaContainerFactory' in 'SecondKafkaConfig' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory' firstKafkaContainerFactory

Action:

Consider revisiting the entries above or defining a bean named 'secondKafkaContainerFactory' in your configuration.

我做错什么了?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题