我有一个测试监听器:
@KafkaListener(topics = TEST_TOPIC)
public void consume(
ConsumerRecord<String, Map> consumerRecord,
@Payload String message) {
}
以及测试制作人:
@Test
public void throwExceptionTest() throws Exception {
Exception kafkaException = new Exception("test exception");
kafkaTemplate.send(TEST_TOPIC, "testKey", kafkaException);
}
我正在尝试对kafka侦听器错误处理程序进行单元测试:
@Component
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Logger logger = LoggerFactory.getLogger(KafkaListenerErrorHandler.class);
/**
* Handle the error.
* @param message the spring-messaging message.
* @param exception the exception the listener threw, wrapped in a
* {@link ListenerExecutionFailedException}.
* @return the return value is ignored unless the annotated method has a
* {@code @SendTo} annotation.
*/
Object handleError(Message<?> message, ListenerExecutionFailedException exception);
/**
* Handle the error.
* @param message the spring-messaging message.
* @param exception the exception the listener threw, wrapped in a
* {@link ListenerExecutionFailedException}.
* @param consumer the consumer.
* @return the return value is ignored unless the annotated method has a
* {@code @SendTo} annotation.
*/
default Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer) {
logger.info(String.format("KafkaListenerErrorHandler:%s", exception.getMessage()));
return handleError(message, exception);
}
}
很难弄清楚如何故意抛出一个会触发该处理程序的错误。
任何帮助都将不胜感激。
暂无答案!
目前还没有任何答案,快来回答吧!