我想达到的目标:
使用Reactor Kafka Consumer,消费消息,并且能够在出现错误时“什么都不做”。
Kafka内部的消息是Person
的JSON表示,业务流程是将其转换为person
对象。
消息输入示例:{"name":"John", "age":30}
个
但是,可能会发生以下情况:1 -这是一个令人高兴的情况,JSON格式良好,我可以用objectMapper将其转换为Person。2 -这是一个错误的json,JsonProcessingException
,我想只是抛出一个运行时异常3 -json是不完整的,就像{"name":"John", "ag
单独在这种情况下,我想忽略这种情况。然而,我不能忽视这条信息。
我尝试了什么:
Flux<Person> consume() {
return kafkaReceiver.receive()
.map(oneRecordWithJson -> {
try {
//regular flow, no issue, just transform an input to an output
return objectMapper.readValue(oneRecordWithJson.value(), Person.class);
} catch (JsonEOFException | JsonMappingException e) {
// the json is broken in the first place, I do not want to do anything in this case.
// I do not want to return a default bad object, I do not want to throw any exception
// But here, I am required to return
LOGGER.error("the input is broken, it is not even a valid json to begin with {}", oneRecordWithJson.value(), e);
return null; //<- no!
} catch (JsonProcessingException e) {
// this is bad, I do want to throw an exception here (which will stop the consumer, but that what is expected)
throw new RuntimeException(e);
}
})
.map(oneCorrectPerson -> doSomething(oneCorrectPerson));
}
字符串
问:
如何忽略输入格式不正确的情况?而不必抛出异常,返回null,默认对象等。
1条答案
按热度按时间t2a7ltrp1#
使用
Flux.mapNotNull
:字符串