所以我在为KafkaReact堆写junit的时候面临一个问题:
我写了:when(argumentmatchers.any()).thenreturn(mono.just(message));
所以我得到了mono.just()的响应,但是如何测试kafkasender.send(mono.just(message))——这个片段?我把它也放在里面了,但是我有个例外。。。让我知道如何设置 Flux<SenderResult<String,String,Map<String,String>>> fluxValue
在junit中…..as send方法返回 Flux<Sender<String,String,Map<String,String>>>
```
Map<String, String> contextMetaData = new HashedMap();
SenderRecord<String, String, Map<String, String>> message = SenderRecord.create(
new ProducerRecord<>("exampleTopic" id, topicdata),
contextMetaData);
kafkaSender.send(Mono.just(message)).then().doOnError(error -> {
throw new BusinessException(ERROR_PUBLISHING_MESSAGE.getCode(), error.getMessage(),
Severity.NON_RETRIABLE,
ConfigReader.getInstance().getAsString(Constants.SELF_SOURCE_SYSTEM),
error.getCause());
}).doOnSuccess(s -> {
logger.info(
"Message Sent Successfully to " + "example"+ " TOPIC...");
}).doOnCancel(() -> close()).doOnNext(r -> {
logger.trace("Successfully stored ");
}).subscribe();
}
暂无答案!
目前还没有任何答案,快来回答吧!