单元测试kafkalistenererrorhandler

cs7cruho  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(391)

我有一个测试监听器:

@KafkaListener(topics = TEST_TOPIC)
public void consume(
        ConsumerRecord<String, Map> consumerRecord,
        @Payload String message) {
}

以及测试制作人:

@Test
public void throwExceptionTest() throws Exception {
    Exception kafkaException = new Exception("test exception");
    kafkaTemplate.send(TEST_TOPIC, "testKey", kafkaException);
}

我正在尝试对kafka侦听器错误处理程序进行单元测试:

@Component
@FunctionalInterface
public interface KafkaListenerErrorHandler {

Logger logger = LoggerFactory.getLogger(KafkaListenerErrorHandler.class);

/**
 * Handle the error.
 * @param message the spring-messaging message.
 * @param exception the exception the listener threw, wrapped in a
 * {@link ListenerExecutionFailedException}.
 * @return the return value is ignored unless the annotated method has a
 * {@code @SendTo} annotation.
 */
Object handleError(Message<?> message, ListenerExecutionFailedException exception);

/**
 * Handle the error.
 * @param message the spring-messaging message.
 * @param exception the exception the listener threw, wrapped in a
 * {@link ListenerExecutionFailedException}.
 * @param consumer the consumer.
 * @return the return value is ignored unless the annotated method has a
 * {@code @SendTo} annotation.
 */
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
                           Consumer<?, ?> consumer) {
    logger.info(String.format("KafkaListenerErrorHandler:%s", exception.getMessage()));
    return handleError(message, exception);
}

}
很难弄清楚如何故意抛出一个会触发该处理程序的错误。
任何帮助都将不胜感激。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题