如何使用reactor Kafka在消耗无效json时什么都不做?

js5cn81o  于 2023-08-08  发布在  React
关注(0)|答案(1)|浏览(148)

我想达到的目标:
使用Reactor Kafka Consumer,消费消息,并且能够在出现错误时“什么都不做”。
Kafka内部的消息是Person的JSON表示,业务流程是将其转换为person对象。
消息输入示例:{"name":"John", "age":30}
但是,可能会发生以下情况:1 -这是一个令人高兴的情况,JSON格式良好,我可以用objectMapper将其转换为Person。2 -这是一个错误的json,JsonProcessingException,我想只是抛出一个运行时异常3 -json是不完整的,就像{"name":"John", "ag单独在这种情况下,我想忽略这种情况。然而,我不能忽视这条信息。
我尝试了什么:

Flux<Person> consume() {
        return kafkaReceiver.receive()
                .map(oneRecordWithJson -> {
                    try {
                        //regular flow, no issue, just transform an input to an output
                        return objectMapper.readValue(oneRecordWithJson.value(), Person.class);
                    } catch (JsonEOFException | JsonMappingException e) {
                        // the json is broken in the first place, I do not want to do anything in this case.
                        // I do not want to return a default bad object, I do not want to throw any exception
                        // But here, I am required to return
                        LOGGER.error("the input is broken, it is not even a valid json to begin with {}", oneRecordWithJson.value(), e);
                        return null; //<- no!
                    } catch (JsonProcessingException e) {
                        // this is bad, I do want to throw an exception here (which will stop the consumer, but that what is expected)
                        throw new RuntimeException(e);
                    }
                })
                .map(oneCorrectPerson -> doSomething(oneCorrectPerson));
    }

字符串
问:
如何忽略输入格式不正确的情况?而不必抛出异常,返回null,默认对象等。

t2a7ltrp

t2a7ltrp1#

使用Flux.mapNotNull

return kafkaReceiver.receive()
            .mapNotNull(oneRecordWithJson -> {
                // whatever
                return null; // record would be skipped
            })

字符串

相关问题