kafka:consumerapi:如果在组中运行(按顺序),则回归测试失败

thtygnil  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(346)

我已经使用消费者api实现了一个kafka应用程序。我用流api实现了两个回归测试:
测试快乐路径:通过从测试中生成应用程序将使用的数据(到应用程序正在侦听的输入主题中),应用程序将生成测试将使用的数据(到输出主题中),并根据预期的输出数据进行验证。
测试错误路径:行为同上。尽管这一次应用程序将把数据生成输出主题,而测试将使用应用程序的错误主题,并根据预期的错误输出进行验证。
我的代码和回归测试代码位于预期目录结构下的同一项目下。两个时间(对于两个测试)数据都应该由应用程序端的同一侦听器获取。
问题是:
当我单独(手动)执行测试时,每个测试都是通过的。然而,如果我一起执行它们,但顺序(例如:gradle clean build),只有第一个测试通过。第二个测试在测试端使用者轮询数据之后失败,并且在一段时间之后它放弃了找不到任何数据。
观察:
从调试来看,似乎第一次一切都完美地工作了(测试端和应用程序端的生产者和消费者)。但是,在第二次测试期间,应用程序端使用者似乎没有接收到任何数据(似乎测试端生产者正在生成数据,但不能确定),因此没有数据生成到错误主题中。
到目前为止,我尝试了:
经过调查,我的理解是我们正在进入比赛状态,为了避免这种情况,我发现了如下建议:
使用@dirtiescontext(classmode=dirtiescontext.classmode.after\u每个\u测试\u方法)
在每次测试后撕掉代理(请参阅代理上的“.destry()”)
每个测试使用不同的主题名
我申请了所有这些,但仍然无法恢复我的问题。
我在这里提供代码供阅读。任何见解都是值得赞赏的。
第一次测试的代码(测试错误路径):

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false,
        topics = {
                AdapterStreamProperties.Constants.INPUT_TOPIC,
                AdapterStreamProperties.Constants.ERROR_TOPIC
        },

        brokerProperties = {
                "listeners=PLAINTEXT://localhost:9092",
                "port=9092",
                "log.dir=/tmp/data/logs",
                "auto.create.topics.enable=true",
                "delete.topic.enable=true"
        }
)
public class AbstractIntegrationFailurePathTest {
    private final int retryLimit = 0;

    @Autowired
    protected EmbeddedKafkaBroker embeddedFailurePathKafkaBroker;

    //To produce data
    @Autowired
    protected KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate;

    //To read from output error
    @Autowired
    protected Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer;

    //Service to execute notification-preference
    @Autowired
    protected AdapterStreamProperties projectProerties;

    protected void subscribe(Consumer consumer, String topic, int attempt) {
        try {
            embeddedFailurePathKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
        } catch (ComparisonFailure ex) {
            if (attempt < retryLimit) {
                subscribe(consumer, topic, attempt + 1);
            }
        }
    }
}

.

@TestConfiguration
   public  class AdapterStreamFailurePathTestConfig {
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

    @Value("${spring.kafka.adapter.application-id}")
    private String applicationId;

    @Value("${spring.kafka.adapter.group-id}")
    private String groupId;

    //Producer of records that the program consumes
    @Bean
    public Map<String, Object> sendEmailCmdProducerConfigs() {
        Map<String, Object> results = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        results.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.KEY_SERDE.serializer().getClass());
        results.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.INPUT_VALUE_SERDE.serializer().getClass());
        return results;
    }

    @Bean
    public ProducerFactory<PreferredMediaMsgKey, SendEmailCmd> inputProducerFactory() {
        return new DefaultKafkaProducerFactory<>(sendEmailCmdProducerConfigs());
    }

    @Bean
    public KafkaTemplate<PreferredMediaMsgKey, SendEmailCmd> inputProducerTemplate() {
        return new KafkaTemplate<>(inputProducerFactory());
    }

    //Consumer of the error output, generated by the program
    @Bean
    public Map<String, Object> outputErrorConsumerConfig() {
        Map<String, Object> props = KafkaTestUtils.consumerProps(
                applicationId, Boolean.TRUE.toString(), embeddedKafkaBroker);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.KEY_SERDE.deserializer().getClass()
                        .getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                AdapterStreamProperties.Constants.ERROR_VALUE_SERDE.deserializer().getClass()
                        .getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }

    @Bean
    public Consumer<PreferredMediaMsgKey, ErrorCmd> outputErrorConsumer() {
        DefaultKafkaConsumerFactory<PreferredMediaMsgKey, ErrorCmd> rpf =
                new DefaultKafkaConsumerFactory<>(outputErrorConsumerConfig());
        return rpf.createConsumer(groupId, "notification-failure");
    }

}

.

@RunWith(SpringRunner.class)
@SpringBootTest(classes = AdapterStreamFailurePathTestConfig.class)
@ActiveProfiles(profiles = "errtest")
public class ErrorPath400Test extends AbstractIntegrationFailurePathTest {

@Autowired
private DataGenaratorForErrorPath400Test datagen;

@Mock
private AdapterHttpClient httpClient;

@Autowired
private ErroredEmailCmdDeserializer erroredEmailCmdDeserializer;

@Before
public void setup() throws InterruptedException {

    Mockito.when(httpClient.callApi(Mockito.any()))
            .thenReturn(
                    new GenericResponse(
                            400,
                            TestConstants.ERROR_MSG_TO_CHK));
    Mockito.when(httpClient.createURI(Mockito.any(),Mockito.any(),Mockito.any())).thenCallRealMethod();

    inputProducerTemplate.send(
            projectProerties.getInputTopic(),
            datagen.getKey(),
            datagen.getEmailCmdToProduce());
    System.out.println("producer: "+ projectProerties.getInputTopic());

    subscribe(outputErrorConsumer , projectProerties.getErrorTopic(), 0);

}

@Test
public void testWithError() throws InterruptedException, InvalidProtocolBufferException, TextFormat.ParseException {

    ConsumerRecords<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd> records;
    List<ConsumerRecord<PreferredMediaMsgKeyBuf.PreferredMediaMsgKey, ErrorCommandBuf.ErrorCmd>> outputListOfErrors = new ArrayList<>();

    int attempt = 0;
    int expectedRecords = 1;
    do {
        records = KafkaTestUtils.getRecords(outputErrorConsumer);
        records.forEach(outputListOfErrors::add);
        attempt++;
    } while (attempt < expectedRecords && outputListOfErrors.size() < expectedRecords);

    //Verify the recipient event stream size
    Assert.assertEquals(expectedRecords, outputListOfErrors.size());

    //Validate output

}

@After
public void tearDown() {
    outputErrorConsumer.close();
    embeddedFailurePathKafkaBroker.destroy();
}

}
第二次测试在结构上几乎相同。尽管这次测试端使用者是从应用程序端输出主题(而不是错误主题)消费的。我给消费者,经纪人,制片人,主题起了不同的名字。比如:

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false,
        topics = {
                AdapterStreamProperties.Constants.INPUT_TOPIC,
                AdapterStreamProperties.Constants.OUTPUT_TOPIC
        },
        brokerProperties = {
                "listeners=PLAINTEXT://localhost:9092",
                "port=9092",
                "log.dir=/tmp/data/logs",
                "auto.create.topics.enable=true",
                "delete.topic.enable=true"
        }

)
public class AbstractIntegrationSuccessPathTest {
    private final int retryLimit = 0;

    @Autowired
    protected EmbeddedKafkaBroker embeddedKafkaBroker;

    //To produce data
    @Autowired
    protected KafkaTemplate<PreferredMediaMsgKey,SendEmailCmd> sendEmailCmdProducerTemplate;

    //To read from output regular topic
    @Autowired
    protected Consumer<PreferredMediaMsgKey, NotifiedEmailCmd> ouputConsumer;

    //Service to execute notification-preference
    @Autowired
    protected AdapterStreamProperties projectProerties;

    protected void subscribe(Consumer consumer, String topic, int attempt) {
        try {
            embeddedKafkaBroker.consumeFromAnEmbeddedTopic(consumer, topic);
        } catch (ComparisonFailure ex) {
            if (attempt < retryLimit) {
                subscribe(consumer, topic, attempt + 1);
            }
        }
    }
}

如果我需要提供更多的信息,请告诉我。,

qxsslcnc

qxsslcnc1#

“端口=9092”
不要使用固定端口;如果不考虑这一点,嵌入式代理将使用一个随机端口;在中设置使用者配置 KafkaTestUtils 指向随机端口。
您不应该在每个测试方法之后弄脏上下文-使用不同的 group.id 对于每个测试和不同的 topic .

dldeef67

dldeef672#

在我的情况下,消费者没有正确关闭。我必须做:

@After
public void tearDown() {
  // shutdown hook to correctly close the streams application
  Runtime.getRuntime().addShutdownHook(new Thread(ouputConsumer::close));
}

解决问题。

相关问题