我有一个Kafka听众 @Controller
:
@KafkaListener(topics = "${binlookup.kafka.topic.insert}")
public BinInfoJsonDTO listenAndInsert(String binInfoJson, Acknowledgment ack) {
log.debug("Kafka server: {}", bootstrapServers);
log.info("Kafka inserting: {}", binInfoJson);
ObjectMapper mapper = new ObjectMapper();
BinInfoJsonDTO dto;
BinLookupError error;
String responseError;
try {
dto = mapper.readValue(binInfoJson, BinInfoJsonDTO.class); // <--- exception happens here
log.debug("The converted DTO: {}", dto);
ResponseEntity<BinInfoJsonDTO> response = controller.insertBIN(Utilities.jsonDtoToEntity(dto));
log.debug("BIN inserted by Kafka. {}", response);
if (response.getStatusCodeValue() == HttpStatus.OK.value() ||
response.getStatusCodeValue() == HttpStatus.CREATED.value()) {
ack.acknowledge(); // manual acknowledge.
log.debug("Acknowledged.");
} else {
responseError = "Unexpected response status code: " + response.getStatusCodeValue();
log.error(responseError);
error = new BinLookupError(responseError);
error.setErrorType(ErrorTypeEnum.UNEXPECTED_STATUS_CODE);
throw new BinLookupException(error);
}
} catch (JsonProcessingException e) {
log.error("Cannot parse Kafka incoming JSON. ", e);
error = new BinLookupError(e.getMessage());
error.setErrorType(ErrorTypeEnum.KAFKA_JSON_FAILURE);
throw new BinLookupException(error);
} catch (IOException e1) {
log.error("I/O exception in Kafka. ", e1);
error = new BinLookupError(e1.getMessage());
error.setErrorType(ErrorTypeEnum.IO_EXCEPTION);
throw new BinLookupException(error);
}
return dto;
}
我有一个异常处理程序 @ControllerAdvice
:
@Slf4j
@ControllerAdvice
public class BinLookupExceptionHandler extends ResponseEntityExceptionHandler {
@ExceptionHandler(value = {BinLookupException.class})
protected ResponseEntity<BinLookupError> handleBinLookupException(BinLookupException ex) {
BinLookupError error = ex.getError();
log.error("Exception handler: {}", error.getOriginalMsg());
...
}
现在,当我运行应用程序并在producer中输入格式错误的json时,异常会发生,但不会被异常处理程序捕获。例外是 com.fasterxml.jackson.core.JsonParseException
.
[NODE=Shenmue3] [ENV=dev] [SRC=UNDEFINED] [TRACE=] [SPAN=] [2018-09-13T09:38:21.035Z] [INFO] [MSG=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] c.p.binlookup.kafka.BinInfoConsumer - Kafka inserting: {{{{{ ]
[NODE=Shenmue3] [ENV=dev] [SRC=UNDEFINED] [TRACE=] [SPAN=] [2018-09-13T09:38:21.119Z] [ERROR] [MSG=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] c.p.binlookup.kafka.BinInfoConsumer - Cannot parse Kafka incoming JSON. com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting double-quote to start field name
at [Source: (String)"{{{{{"; line: 1, column: 3]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:663)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:561)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddName(ReaderBasedJsonParser.java:1757)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:682)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4001)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2992)
at com.xxx.binlookup.kafka.BinInfoConsumer.listenAndInsert(BinInfoConsumer.java:48)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
]
[NODE=Shenmue3] [ENV=dev] [SRC=UNDEFINED] [TRACE=] [SPAN=] [2018-09-13T09:38:21.122Z] [ERROR] [MSG=[org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = insert, partition = 0, offset = 0, CreateTime = 1536831499778, serialized key size = -1, serialized value size = 5, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {{{{{) org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public com.xxx.binlookup.dto.BinInfoJsonDTO com.xxx.binlookup.kafka.BinInfoConsumer.listenAndInsert(java.lang.String,org.springframework.kafka.support.Acknowledgment)' threw exception; nested exception is BinLookupException [error=BinLookupError(originalMsg=Unexpected character ('{' (code 123)): was expecting double-quote to start field name
at [Source: (String)"{{{{{"; line: 1, column: 3], fields=[], errorType=Cannot parse Kafka incoming JSON)]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:267)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:80)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1071)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1051)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:998)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:866)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:724)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.xxx.binlookup.exception.BinLookupException: Original message is: Unexpected character ('{' (code 123)): was expecting double-quote to start field name
at [Source: (String)"{{{{{"; line: 1, column: 3]
at com.xxx.binlookup.kafka.BinInfoConsumer.listenAndInsert(BinInfoConsumer.java:67)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:248)
... 10 common frames omitted
]
``` `"Exception handler: {}"` ,作为处理程序中日志的一部分,不存在。
我还应该做些什么来让它知道我抛出的异常?谢谢。
暂无答案!
目前还没有任何答案,快来回答吧!