@controller kafka侦听器异常未被异常处理程序@controlleradvice捕获

slwdgvem  于 2021-06-06  发布在  Kafka
关注(0)|答案(0)|浏览(353)

我有一个Kafka听众 @Controller :

@KafkaListener(topics = "${binlookup.kafka.topic.insert}")
public BinInfoJsonDTO listenAndInsert(String binInfoJson, Acknowledgment ack) {
    log.debug("Kafka server: {}", bootstrapServers);
    log.info("Kafka inserting: {}", binInfoJson);
    ObjectMapper mapper = new ObjectMapper();
    BinInfoJsonDTO dto;
    BinLookupError error;
    String responseError;
    try {
        dto = mapper.readValue(binInfoJson, BinInfoJsonDTO.class);    // <--- exception happens here
        log.debug("The converted DTO: {}", dto);
        ResponseEntity<BinInfoJsonDTO> response = controller.insertBIN(Utilities.jsonDtoToEntity(dto));
        log.debug("BIN inserted by Kafka. {}", response);
        if (response.getStatusCodeValue() == HttpStatus.OK.value() ||
                response.getStatusCodeValue() == HttpStatus.CREATED.value()) {
            ack.acknowledge(); // manual acknowledge.
            log.debug("Acknowledged.");
        } else {
            responseError = "Unexpected response status code: " + response.getStatusCodeValue();
            log.error(responseError);
            error = new BinLookupError(responseError);
            error.setErrorType(ErrorTypeEnum.UNEXPECTED_STATUS_CODE);
            throw new BinLookupException(error);
        }
    } catch (JsonProcessingException e) {
        log.error("Cannot parse Kafka incoming JSON. ", e);
        error = new BinLookupError(e.getMessage());
        error.setErrorType(ErrorTypeEnum.KAFKA_JSON_FAILURE);
        throw new BinLookupException(error);
    } catch (IOException e1) {
        log.error("I/O exception in Kafka. ", e1);
        error = new BinLookupError(e1.getMessage());
        error.setErrorType(ErrorTypeEnum.IO_EXCEPTION);
        throw new BinLookupException(error);
    }

    return dto;
}

我有一个异常处理程序 @ControllerAdvice :

@Slf4j
@ControllerAdvice
public class BinLookupExceptionHandler extends ResponseEntityExceptionHandler {
    @ExceptionHandler(value = {BinLookupException.class})
    protected ResponseEntity<BinLookupError> handleBinLookupException(BinLookupException ex) {
        BinLookupError error = ex.getError();
        log.error("Exception handler: {}", error.getOriginalMsg());
        ...

}

现在,当我运行应用程序并在producer中输入格式错误的json时,异常会发生,但不会被异常处理程序捕获。例外是 com.fasterxml.jackson.core.JsonParseException .

[NODE=Shenmue3] [ENV=dev] [SRC=UNDEFINED] [TRACE=] [SPAN=] [2018-09-13T09:38:21.035Z] [INFO] [MSG=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] c.p.binlookup.kafka.BinInfoConsumer - Kafka inserting: {{{{{ ]
[NODE=Shenmue3] [ENV=dev] [SRC=UNDEFINED] [TRACE=] [SPAN=] [2018-09-13T09:38:21.119Z] [ERROR] [MSG=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] c.p.binlookup.kafka.BinInfoConsumer - Cannot parse Kafka incoming JSON.  com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting double-quote to start field name
 at [Source: (String)"{{{{{"; line: 1, column: 3]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:663)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:561)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddName(ReaderBasedJsonParser.java:1757)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:682)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2992)
    at com.xxx.binlookup.kafka.BinInfoConsumer.listenAndInsert(BinInfoConsumer.java:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
]
[NODE=Shenmue3] [ENV=dev] [SRC=UNDEFINED] [TRACE=] [SPAN=] [2018-09-13T09:38:21.122Z] [ERROR] [MSG=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = insert, partition = 0, offset = 0, CreateTime = 1536831499778, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {{{{{) org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public com.xxx.binlookup.dto.BinInfoJsonDTO com.xxx.binlookup.kafka.BinInfoConsumer.listenAndInsert(java.lang.String,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is BinLookupException [error=BinLookupError(originalMsg=Unexpected character ('{' (code 123)): was expecting double-quote to start field name
 at [Source: (String)"{{{{{"; line: 1, column: 3], fields=[], errorType=Cannot parse Kafka incoming JSON)]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.xxx.binlookup.exception.BinLookupException: Original message is: Unexpected character ('{' (code 123)): was expecting double-quote to start field name
 at [Source: (String)"{{{{{"; line: 1, column: 3]
    at com.xxx.binlookup.kafka.BinInfoConsumer.listenAndInsert(BinInfoConsumer.java:67)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248)
    ... 10 common frames omitted
]
``` `"Exception handler: {}"` ,作为处理程序中日志的一部分,不存在。
我还应该做些什么来让它知道我抛出的异常?谢谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题