用kafka和spring boot传递消息时出错

mhd8tkvw  于 2021-07-23  发布在  Java
关注(0)|答案(1)|浏览(462)

我用的是Kafka,我要跑放法
Kafka是带有zookeeper的docker容器,我有一个主题叫updaterate
这是放方法运行的方法

@PutMapping("v1/branches/{branchId}/rateUpdate")
public ResponseEntity<Void> rateBranchUpdate(@RequestParam("rate") Double rate,@PathVariable Long branchId) throws JsonProcessingException {
    Map<String,String> map=new HashMap<String,String>();
    map.put("rate",rate.toString());
    map.put("branchId",branchId.toString());
    producer.produceUpdateRate(map);
    return ResponseEntity.noContent().headers(HeaderUtil.createEntityDeletionAlert(applicationName, true, ENTITY_NAME, branchId.toString())).build();

}

这位是制片人

public class BranchProducer {

    private static final String TOPIC = "updateRate";

    private static final String groupId="store-service";

   private KafkaTemplate<String, String> kafkaTemplate;

   private final BranchService branchService;

    public BranchProducer(BranchService branchService,KafkaTemplate<String, String> kafkaTemplate) {
        this.branchService = branchService;
       this.kafkaTemplate=kafkaTemplate;
    }

    public void produceUpdateRate(Map<String,String> rate) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        kafkaTemplate.send(TOPIC,mapper.writeValueAsString(rate));
    }
}

这就是消费者

@Component
public class BranchConsumer {

    private final Logger log = LoggerFactory.getLogger(BranchService.class);
    private final BranchService branchService;
    private static final String TOPIC = "updateRate";
    private static final String groupId="store-service";
    public BranchConsumer(BranchService branchService) {
        this.branchService = branchService;
    }

   @KafkaListener(topics = TOPIC ,groupId = groupId)
    public void processUpdateRating(@Payload Map<String,String> rate)
    {
        branchService.rateUpdate(Double.parseDouble(rate.get("rate")),Long.parseLong(rate.get("branchId")));
        log.debug("done update in queue");

    }

}

但当运行代码生产者不发送消息给消费者,我有这个错误

javax.ws.rs.WebApplicationException: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 1130]
    at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:110)
    at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:634)
    at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:586)
    at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplicationsInternal(AbstractJerseyEurekaHttpClient.java:200)
    at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getDelta(AbstractJerseyEurekaHttpClient.java:172)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient.execute(MetricsCollectingEurekaHttpClient.java:73)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.execute(RedirectingEurekaHttpClient.java:89)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:120)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.DiscoveryClient.getAndUpdateDelta(DiscoveryClient.java:1108)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:990)
    at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1510)
    at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1477)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 1130]
    at com.netflix.discovery.converters.EurekaJacksonCodec$InstanceInfoDeserializer.deserialize(EurekaJacksonCodec.java:500)
    at com.netflix.discovery.converters.EurekaJacksonCodec$InstanceInfoDeserializer.deserialize(EurekaJacksonCodec.java:424)
    at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1682)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:977)
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationDeserializer.deserialize(EurekaJacksonCodec.java:754)
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationDeserializer.deserialize(EurekaJacksonCodec.java:714)
    at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1682)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:977)
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:824)
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:791)
    at com.fasterxml.jackson.databind.ObjectReader._unwrapAndDeserialize(ObjectReader.java:1858)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1716)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1228)
    at com.netflix.discovery.converters.EurekaJacksonCodec.readValue(EurekaJacksonCodec.java:213)
    at com.netflix.discovery.converters.wrappers.CodecWrappers$LegacyJacksonJson.decode(CodecWrappers.java:314)
    at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:103)
    ... 26 common frames omitted

2021-03-09 17:04:04.194  WARN 50381 --- [freshExecutor-0] c.n.d.s.t.d.RetryableEurekaHttpClient    : Request execution failed with message: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 1130]
2021-03-09 17:04:04.197 ERROR 50381 --- [freshExecutor-0] c.n.d.s.t.d.RedirectingEurekaHttpClient  : Request execution error. endpoint=DefaultEndpoint{ serviceUrl='http://admin:admin@localhost:8761/eureka/}

javax.ws.rs.WebApplicationException: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 18]
    at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:110)
    at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:634)
    at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:586)
    at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplicationsInternal(AbstractJerseyEurekaHttpClient.java:200)
    at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getDelta(AbstractJerseyEurekaHttpClient.java:172)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient.execute(MetricsCollectingEurekaHttpClient.java:73)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.executeOnNewServer(RedirectingEurekaHttpClient.java:118)
    at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.execute(RedirectingEurekaHttpClient.java:79)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:120)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.DiscoveryClient.getAndUpdateDelta(DiscoveryClient.java:1108)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:990)
    at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1510)
    at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1477)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 18]
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:805)
    at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:791)
    at com.fasterxml.jackson.databind.ObjectReader._unwrapAndDeserialize(ObjectReader.java:1858)
    at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1716)
    at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1228)
    at com.netflix.discovery.converters.EurekaJacksonCodec.readValue(EurekaJacksonCodec.java:213)
    at com.netflix.discovery.converters.wrappers.CodecWrappers$LegacyJacksonJson.decode(CodecWrappers.java:314)
    at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:103)
    ... 27 common frames omitted

2021-03-09 17:04:04.197  WARN 50381 --- [freshExecutor-0] c.n.d.s.t.d.RetryableEurekaHttpClient    : Request execution failed with message: com.fasterxml.jackson.core.JsonParseException: processing aborted
 at [Source: (GZIPInputStream); line: 1, column: 18]
2021-03-09 17:04:04.198  INFO 50381 --- [ad | producer-2] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-2] Cluster ID: DSfA1NpaRFWuUSjrZEhRpg
2021-03-09 17:04:04.198 ERROR 50381 --- [freshExecutor-0] com.netflix.discovery.DiscoveryClient    : DiscoveryClient_STORESERVICE/storeservice:f479d0dd76c5282f4e6baa1eeac92abb - was unable to refresh its cache! status = Cannot execute request on any known server

com.netflix.discovery.shared.transport.TransportException: Cannot execute request on any known server
    at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:112)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
    at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
    at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
    at com.netflix.discovery.DiscoveryClient.getAndUpdateDelta(DiscoveryClient.java:1108)
    at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:990)
    at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1510)
    at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1477)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2021-03-09 17:04:04.208 DEBUG 50381 --- [  XNIO-1 task-1] asc.foods.store.web.rest.BranchResource  : Exit: rateBranchUpdate() with result = <204 NO_CONTENT No Content,[X-storeServiceApp-alert:"storeServiceApp.storeServiceBranch.deleted", X-storeServiceApp-params:"12"]>
2021-03-09 17:04:04.291  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2021-03-09 17:04:04.297  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] Attempt to heartbeat failed for since member id consumer-1-d8241fb3-e51c-4754-ba1d-6f7bb30892c2 is not valid.
2021-03-09 17:04:04.299  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] Revoking previously assigned partitions [updateRate-0]
2021-03-09 17:04:04.299  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] (Re-)joining group
2021-03-09 17:04:04.301  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] (Re-)joining group
2021-03-09 17:04:07.312  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] Successfully joined group with generation 3
2021-03-09 17:04:07.313  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] Setting newly assigned partitions: updateRate-0
2021-03-09 17:04:07.316  INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=store-service] Setting offset for partition updateRate-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}
2021-03-09 17:04:07.337 ERROR 50381 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler   : Error while processing: ConsumerRecord(topic = updateRate, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1615302244202, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"branchId":"12","rate":"7.0"})

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void asc.foods.store.kafka.BranchConsumer.processUpdateRating(java.util.Map<java.lang.String, java.lang.String>)]
Bean [asc.foods.store.kafka.BranchConsumer@5682f7d6]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}], failedMessage=GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}], failedMessage=GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1678)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1591)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1532)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1446)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1196)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:955)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}], failedMessage=GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:314)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1640)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1623)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1578)
    ... 9 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:905)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:304)
    ... 14 common frames omitted

2021-03-09 17:08:09.557  INFO 50381 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver      : Resolving eureka endpoints via configuration

此错误表示无法使用传入消息调用侦听器方法

dtcbnfnu

dtcbnfnu1#

ProcessUpdateing中的这个错误接受字符串参数,但我有map参数解决方案只是将map改为string
@kafkalistener(topics=topic,groupid=groupid)public void processupdateing(string rate){log.debag(rate);
}

相关问题