我从Kafka制作人那里得到json有效载荷。
[
{
"mcID": "840465",
"subscriberId": "",
"sequenceNumber": "",
"membershipSourceSystem": "",
"groupNumber": "",
"userNm": "174988",
"webGuid": "",
"interactionSource": "Engage",
"channel": "Mobile",
"sessionId": "",
"deviceId": "",
"direction": "",
"injectionTimestamp": "2020-09-25T00:00:00.000-04:00",
"associateId": "",
"contactType": "",
"contactEmail": "",
"contactFirstName": "SUS",
"contactMiddletName": "",
"contactLastName": "HAT",
"contactDOB": "19540807",
"outcome": "",
"notes": "",
"interactionIntent": {
"interactionIntent": {
"registration": {
"eventTimestamp": "2020-09-25T00:00:00.000-04:00"
}
},
"login": [
{
"totalVisits": "1",
"totalCareTeamProviders": "2",
"totalTrackersPurchased": "0",
"eventTimestamp": "2020-09-25T00:00:00.000-04:00"
}
]
}
}
]
我在Kafka听众端得到下面的问题。
2020-12-03 23:45:30.698 [org.springframework.kafka.KafkaListenerEndpointContainer#0-8-C-1] ERROR c.a.e.d.m.e.k.consumer.EventListener -
EEECATLIGHTDIGITALINTERACTIONS KAFKA CONSUMER: error caught in kafka listener: Cannot deserialize instance of `com.anthem.emep.dckr.microsvc.eeecatlightdigitalinteractions.model.Event` out of START_ARRAY token
at [Source: (String)"[{"mcID":"840465","subscriberId":"","sequenceNumber":"","membershipSourceSystem":"","groupNumber":"","userNm":"174988","webGuid":"","interactionSource":"Engage","channel":"Mobile","sessionId":"","deviceId":"","direction":"","injectionTimestamp":"2020-09-25T00:00:00.000-04:00","associateId":"","contactType":"","contactEmail":"","contactFirstName":"SUSAN","contactMiddletName":"","contactLastName":"HATCHER","contactDOB":"19540807","outcome":"","notes":"","interacti"[truncated 238 chars]; line: 1, column: 1]
2020-12-03 23:45:30.803 [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] ERROR c.a.e.d.m.e.k.consumer.EventListener - error caught in kafka listener: Cannot deserialize instance of `com.anthem.emep.dckr.microsvc.eeecatlightdigitalinteractions.model.Event` out of START_ARRAY token
当我们遇到这种例外的时候?是因为对象没有正确Map吗?有人能帮忙吗?
Kafka消费代码:
@KafkaListener(topics ="#{'${applicationProperties.kafkapropmap.kafka.topics}'.split('\\\\ ')}")
public HttpStatus receiveMessages(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment) {
try {
log.info("event Received = "+ (String) consumerRecord.value());
ObjectMapper mapper = new ObjectMapper();
Event event = mapper.readValue(((String) consumerRecord.value()), Event.class);
} catch(Exception ex) {
log.error("{} error caught in kafka listener: {}", ApplicationConstant.MEMBER_MSG_HEADER, ex.getMessage());
acknowledgment.acknowledge();
}
}
1条答案
按热度按时间iqxoj9l91#
Kafka给你寄去了一份有效载荷,里面有[]。这意味着您必须将其Map到列表。
试试这个: