配置springboot kafka streams单元测试

p5fdfcr1  于 2021-07-13  发布在  Java
关注(0)|答案(1)|浏览(448)

我有一个springboot kafka streams maven应用程序。我使用springbootstarter父级2.4.4作为springboot依赖项和kafka流2.7.0。
我被困在测试与 java.lang.NullPointerException 尝试从resources/application.yml或test/resources/application-test.resources或test/resources/application.yml加载应用程序配置时
我有一个配置类,其中包含注解和字段的getter和setter,这些字段的定义与application.yml中的名称相同

package com.acme.rtc.configuration;

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@ConfigurationProperties(prefix = "topics")
@Component
@Configuration
public class ConfigProps {

    private String MATRIXX_ADJ_EVENT_TOPIC;

    private String OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC;

    private String OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC;

    private String EVENTS_NO_MVNO;

    public void setMATRIXX_ADJ_EVENT_TOPIC(String MATRIXX_ADJ_EVENT_TOPIC) {
        this.MATRIXX_ADJ_EVENT_TOPIC = MATRIXX_ADJ_EVENT_TOPIC;
    }

    public void setOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC(String OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC) {
        this.OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC = OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC;
    }

    public void setOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC(String OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC) {
        this.OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC = OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC;
    }

    public String getEVENTS_NO_MVNO() {
        return EVENTS_NO_MVNO;
    }

    public void setEVENTS_NO_MVNO(String EVENTS_NO_MVNO) {
        this.EVENTS_NO_MVNO = EVENTS_NO_MVNO;
    }

    public String getMATRIXX_ADJ_EVENT_TOPIC() {
        return MATRIXX_ADJ_EVENT_TOPIC;
    }

    public String getOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC() {
        return OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC;
    }

    public String getOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC() {
        return OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC;
    }

}

我在测试和应用程序课上做这个类的@autowire,

@Autowired
ConfigProps cp;

并尝试使用 cp.getBootstrapServerHost() 但这在我的测试类中解析为空指针。但在我的应用程序类中正确解析。。。
我的测试课看起来像这样

package distinct;

import com.acme.rtc.configuration.ConfigProps;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import com.acme.rtc.configuration.KafkaConfiguration;
import com.acme.rtc.configuration.TopologyConfiguration;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;

import java.util.List;

import static java.util.Collections.singletonList;
import static java.util.stream.Collectors.toList;
import static org.junit.jupiter.api.Assertions.assertEquals;

@SpringBootTest
@ContextConfiguration(classes = TopologyConfiguration.class)
@SpringJUnitConfig
public class TestWithTopologyTestDriver {
    private TestInputTopic<String, String> inputTopicWrong;
    private TestOutputTopic<String, String> outputTopicWrong;

    private TestInputTopic<String, String> inputTopicRight;
    private TestOutputTopic<String, String> outputTopicRight;
    private TopologyTestDriver topologyTestDriver;

    @Autowired
    ConfigProps configProps;

    @BeforeEach
    public void setUp() {
        KafkaProperties properties = new KafkaProperties();
        properties.setBootstrapServers(singletonList("localhost:9092"));
        KafkaStreamsConfiguration config = new KafkaConfiguration(properties).getStreamsConfig();
        StreamsBuilder sb = new StreamsBuilder();
        Topology topology = new TopologyConfiguration().createTopology(sb);
        topologyTestDriver = new TopologyTestDriver(topology, config.asProperties());
        inputTopicWrong =
                topologyTestDriver.createInputTopic(configProps.getMATRIXX_ADJ_EVENT_TOPIC(), new StringSerializer(),
                        new StringSerializer());
        outputTopicWrong =
                topologyTestDriver.createOutputTopic(configProps.getOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC(), new StringDeserializer(),
                        new StringDeserializer());

        inputTopicRight =
                topologyTestDriver.createInputTopic(configProps.getMATRIXX_ADJ_EVENT_TOPIC(), new StringSerializer(),
                        new StringSerializer());
        outputTopicRight =
                topologyTestDriver.createOutputTopic(configProps.getOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC(), new StringDeserializer(),
                        new StringDeserializer());
    }

    @AfterEach
    public void tearDown() {
        topologyTestDriver.close();
    }
@Test
    void wrongDistinctTopology() {
        testTopology(inputTopicWrong, outputTopicWrong);
    }}

哪里 TopologyConfiguration 是我的申请表,上面有这个签名

package com.acme.rtc.configuration;

import com.fasterxml.jackson.databind.JsonNode;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import org.springframework.stereotype.Component;

@Configuration
@ConfigurationProperties(prefix = "topics")
@Component
@RequiredArgsConstructor
public class TopologyConfiguration {
    @Autowired
    Environment env;

    @Autowired
    ConfigProps configProps;

    private void acmeStreamsTopoloy(StreamsBuilder streamsBuilder) {

        Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
        System.out.println("ConfigProps.getMattrix: "+configProps.getMATRIXX_ADJ_EVENT_TOPIC());

        KStream<String, String> inputStream =
                streamsBuilder.stream(configProps.getMATRIXX_ADJ_EVENT_TOPIC(), Consumed.with(Serdes.String(), Serdes.String()));
        KStream<String, String>[] branches = inputStream.branch(
                (key, value)-> value.contains("KGN"),
                (key, value)-> value.contains("LEB"),
                (key, value)->true);

        branches[0].to(configProps.getOUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC());
        branches[1].to(configProps.getOUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC());
        branches[2].to(configProps.getEVENTS_NO_MVNO());

    }

    @Bean
    public Topology createTopology(StreamsBuilder streamsBuilder) {
        acmeStreamsTopoloy(streamsBuilder);
        return streamsBuilder.build();
    }

}

我的Kafka形象课

package com.acme.rtc.configuration;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {

    public static final String APP_ID = "acme-stream-rtc";
    private final KafkaProperties kafkaProperties;

    @Autowired
    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration getStreamsConfig() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS,false);
        KafkaStreamsConfiguration streamsConfig = new KafkaStreamsConfiguration(props);
        return streamsConfig;
    }

}

我的application.yml有正确的语法等。

spring:
  kafka:
    bootstrap-servers: localhost:9092
  json:
    value:
      default:
        type: true
kafka:
  streams:
    properties:
      default:
        value:
          serde: org.springframework.kafka.support.serializer.JsonSerde
      admin:
    security:
      protocol: SSL
    ssl:
      trust-store-location: ${TRUSTSTORE_LOCATION}
      trust-store-password: ${TRUSTSTORE_PASSWORD}
      key-store-location: ${KEYSTORE_LOCATION}
      key-store-password: ${KEYSTORE_PASSWORD}
      key-password: ${KEY_PASSWORD}

topics:
  MATRIXX_ADJ_EVENT_TOPIC: input-matrixx-adj-event
  OUTPUT_MNVO_KGN_ADJ_EVENT_TOPIC: output-KGN-adj-event
  OUTPUT_MNVO_LEB_ADJ_EVENT_TOPIC: output-LEB-adj-event
  EVENTS_NO_MVNO: events-no-mvno-spec

我的主课

package com.acme.rtc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafkaStreams;

@SpringBootApplication
@EnableKafkaStreams
public class StreamProcessing {

    public static void main(String[] args) {
        SpringApplication.run(StreamProcessing.class, args);
    }

}

我不确定在自动连接configprops类时是否缺少任何上下文,或者是否需要对测试类进行进一步的注解。

tyg4sfes

tyg4sfes1#

对于junit4你需要 @Runwith(SpringJUnit4ClassRunner.class) 沿着 @ContextConfiguration .
对于junit5,使用 @SpringJUnitConfig .
不过,为了正确加载属性,您需要 @SpringBootTest .
boot2.4使用junit5。
你不应该这样 @ConfigurationProperties 在测试中。
编辑
我只是测试了一下,没有问题。

@Configuration
public class Config {

    @Bean
    String str() {
        return "str";
    }

}

@ConfigurationProperties(prefix = "foo")
@Component
public class MyProps {

    String bar;

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "MyProps [bar=" + this.bar + "]";
    }

}

@SpringBootApplication
public class So67078244Application {

    public static void main(String[] args) {
        SpringApplication.run(So67078244Application.class, args);
    }

}

@SpringBootTest
class So67078244ApplicationTests {

    @Autowired
    MyProps props;

    @Test
    void contextLoads() {
        System.out.println(this.props);
    }

}
foo.bar=baz
MyProps [bar=baz]

相关问题