我用的是Kafka,我要跑放法
Kafka是带有zookeeper的docker容器,我有一个主题叫updaterate
这是放方法运行的方法
@PutMapping("v1/branches/{branchId}/rateUpdate")
public ResponseEntity<Void> rateBranchUpdate(@RequestParam("rate") Double rate,@PathVariable Long branchId) throws JsonProcessingException {
Map<String,String> map=new HashMap<String,String>();
map.put("rate",rate.toString());
map.put("branchId",branchId.toString());
producer.produceUpdateRate(map);
return ResponseEntity.noContent().headers(HeaderUtil.createEntityDeletionAlert(applicationName, true, ENTITY_NAME, branchId.toString())).build();
}
这位是制片人
public class BranchProducer {
private static final String TOPIC = "updateRate";
private static final String groupId="store-service";
private KafkaTemplate<String, String> kafkaTemplate;
private final BranchService branchService;
public BranchProducer(BranchService branchService,KafkaTemplate<String, String> kafkaTemplate) {
this.branchService = branchService;
this.kafkaTemplate=kafkaTemplate;
}
public void produceUpdateRate(Map<String,String> rate) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
kafkaTemplate.send(TOPIC,mapper.writeValueAsString(rate));
}
}
这就是消费者
@Component
public class BranchConsumer {
private final Logger log = LoggerFactory.getLogger(BranchService.class);
private final BranchService branchService;
private static final String TOPIC = "updateRate";
private static final String groupId="store-service";
public BranchConsumer(BranchService branchService) {
this.branchService = branchService;
}
@KafkaListener(topics = TOPIC ,groupId = groupId)
public void processUpdateRating(@Payload Map<String,String> rate)
{
branchService.rateUpdate(Double.parseDouble(rate.get("rate")),Long.parseLong(rate.get("branchId")));
log.debug("done update in queue");
}
}
但当运行代码生产者不发送消息给消费者,我有这个错误
javax.ws.rs.WebApplicationException: com.fasterxml.jackson.core.JsonParseException: processing aborted
at [Source: (GZIPInputStream); line: 1, column: 1130]
at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:110)
at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:634)
at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:586)
at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplicationsInternal(AbstractJerseyEurekaHttpClient.java:200)
at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getDelta(AbstractJerseyEurekaHttpClient.java:172)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient.execute(MetricsCollectingEurekaHttpClient.java:73)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.execute(RedirectingEurekaHttpClient.java:89)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:120)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.DiscoveryClient.getAndUpdateDelta(DiscoveryClient.java:1108)
at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:990)
at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1510)
at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1477)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: processing aborted
at [Source: (GZIPInputStream); line: 1, column: 1130]
at com.netflix.discovery.converters.EurekaJacksonCodec$InstanceInfoDeserializer.deserialize(EurekaJacksonCodec.java:500)
at com.netflix.discovery.converters.EurekaJacksonCodec$InstanceInfoDeserializer.deserialize(EurekaJacksonCodec.java:424)
at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1682)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:977)
at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationDeserializer.deserialize(EurekaJacksonCodec.java:754)
at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationDeserializer.deserialize(EurekaJacksonCodec.java:714)
at com.fasterxml.jackson.databind.ObjectReader._bind(ObjectReader.java:1682)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:977)
at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:824)
at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:791)
at com.fasterxml.jackson.databind.ObjectReader._unwrapAndDeserialize(ObjectReader.java:1858)
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1716)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1228)
at com.netflix.discovery.converters.EurekaJacksonCodec.readValue(EurekaJacksonCodec.java:213)
at com.netflix.discovery.converters.wrappers.CodecWrappers$LegacyJacksonJson.decode(CodecWrappers.java:314)
at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:103)
... 26 common frames omitted
2021-03-09 17:04:04.194 WARN 50381 --- [freshExecutor-0] c.n.d.s.t.d.RetryableEurekaHttpClient : Request execution failed with message: com.fasterxml.jackson.core.JsonParseException: processing aborted
at [Source: (GZIPInputStream); line: 1, column: 1130]
2021-03-09 17:04:04.197 ERROR 50381 --- [freshExecutor-0] c.n.d.s.t.d.RedirectingEurekaHttpClient : Request execution error. endpoint=DefaultEndpoint{ serviceUrl='http://admin:admin@localhost:8761/eureka/}
javax.ws.rs.WebApplicationException: com.fasterxml.jackson.core.JsonParseException: processing aborted
at [Source: (GZIPInputStream); line: 1, column: 18]
at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:110)
at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:634)
at com.sun.jersey.api.client.ClientResponse.getEntity(ClientResponse.java:586)
at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplicationsInternal(AbstractJerseyEurekaHttpClient.java:200)
at com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getDelta(AbstractJerseyEurekaHttpClient.java:172)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.MetricsCollectingEurekaHttpClient.execute(MetricsCollectingEurekaHttpClient.java:73)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.executeOnNewServer(RedirectingEurekaHttpClient.java:118)
at com.netflix.discovery.shared.transport.decorator.RedirectingEurekaHttpClient.execute(RedirectingEurekaHttpClient.java:79)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:120)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.DiscoveryClient.getAndUpdateDelta(DiscoveryClient.java:1108)
at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:990)
at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1510)
at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1477)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: processing aborted
at [Source: (GZIPInputStream); line: 1, column: 18]
at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:805)
at com.netflix.discovery.converters.EurekaJacksonCodec$ApplicationsDeserializer.deserialize(EurekaJacksonCodec.java:791)
at com.fasterxml.jackson.databind.ObjectReader._unwrapAndDeserialize(ObjectReader.java:1858)
at com.fasterxml.jackson.databind.ObjectReader._bindAndClose(ObjectReader.java:1716)
at com.fasterxml.jackson.databind.ObjectReader.readValue(ObjectReader.java:1228)
at com.netflix.discovery.converters.EurekaJacksonCodec.readValue(EurekaJacksonCodec.java:213)
at com.netflix.discovery.converters.wrappers.CodecWrappers$LegacyJacksonJson.decode(CodecWrappers.java:314)
at com.netflix.discovery.provider.DiscoveryJerseyProvider.readFrom(DiscoveryJerseyProvider.java:103)
... 27 common frames omitted
2021-03-09 17:04:04.197 WARN 50381 --- [freshExecutor-0] c.n.d.s.t.d.RetryableEurekaHttpClient : Request execution failed with message: com.fasterxml.jackson.core.JsonParseException: processing aborted
at [Source: (GZIPInputStream); line: 1, column: 18]
2021-03-09 17:04:04.198 INFO 50381 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: DSfA1NpaRFWuUSjrZEhRpg
2021-03-09 17:04:04.198 ERROR 50381 --- [freshExecutor-0] com.netflix.discovery.DiscoveryClient : DiscoveryClient_STORESERVICE/storeservice:f479d0dd76c5282f4e6baa1eeac92abb - was unable to refresh its cache! status = Cannot execute request on any known server
com.netflix.discovery.shared.transport.TransportException: Cannot execute request on any known server
at com.netflix.discovery.shared.transport.decorator.RetryableEurekaHttpClient.execute(RetryableEurekaHttpClient.java:112)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator$7.execute(EurekaHttpClientDecorator.java:152)
at com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient.execute(SessionedEurekaHttpClient.java:77)
at com.netflix.discovery.shared.transport.decorator.EurekaHttpClientDecorator.getDelta(EurekaHttpClientDecorator.java:149)
at com.netflix.discovery.DiscoveryClient.getAndUpdateDelta(DiscoveryClient.java:1108)
at com.netflix.discovery.DiscoveryClient.fetchRegistry(DiscoveryClient.java:990)
at com.netflix.discovery.DiscoveryClient.refreshRegistry(DiscoveryClient.java:1510)
at com.netflix.discovery.DiscoveryClient$CacheRefreshThread.run(DiscoveryClient.java:1477)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2021-03-09 17:04:04.208 DEBUG 50381 --- [ XNIO-1 task-1] asc.foods.store.web.rest.BranchResource : Exit: rateBranchUpdate() with result = <204 NO_CONTENT No Content,[X-storeServiceApp-alert:"storeServiceApp.storeServiceBranch.deleted", X-storeServiceApp-params:"12"]>
2021-03-09 17:04:04.291 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-service] Discovered group coordinator localhost:9092 (id: 2147483646 rack: null)
2021-03-09 17:04:04.297 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-service] Attempt to heartbeat failed for since member id consumer-1-d8241fb3-e51c-4754-ba1d-6f7bb30892c2 is not valid.
2021-03-09 17:04:04.299 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=store-service] Revoking previously assigned partitions [updateRate-0]
2021-03-09 17:04:04.299 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-service] (Re-)joining group
2021-03-09 17:04:04.301 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-service] (Re-)joining group
2021-03-09 17:04:07.312 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=store-service] Successfully joined group with generation 3
2021-03-09 17:04:07.313 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=store-service] Setting newly assigned partitions: updateRate-0
2021-03-09 17:04:07.316 INFO 50381 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=store-service] Setting offset for partition updateRate-0 to the committed offset FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null), epoch=0}}
2021-03-09 17:04:07.337 ERROR 50381 --- [ntainer#0-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: ConsumerRecord(topic = updateRate, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1615302244202, serialized key size = -1, serialized value size = 30, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"branchId":"12","rate":"7.0"})
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void asc.foods.store.kafka.BranchConsumer.processUpdateRating(java.util.Map<java.lang.String, java.lang.String>)]
Bean [asc.foods.store.kafka.BranchConsumer@5682f7d6]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}], failedMessage=GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}], failedMessage=GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:1686)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:1678)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1591)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1532)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1446)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1196)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:955)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:890)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}], failedMessage=GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:314)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1640)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1623)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1578)
... 9 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [java.util.Map] for GenericMessage [payload={"branchId":"12","rate":"7.0"}, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@864f31e, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=updateRate, kafka_receivedTimestamp=1615302244202, kafka_groupId=store-service}]
at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:905)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:304)
... 14 common frames omitted
2021-03-09 17:08:09.557 INFO 50381 --- [trap-executor-0] c.n.d.s.r.aws.ConfigClusterResolver : Resolving eureka endpoints via configuration
此错误表示无法使用传入消息调用侦听器方法
1条答案
按热度按时间dtcbnfnu1#
ProcessUpdateing中的这个错误接受字符串参数,但我有map参数解决方案只是将map改为string
@kafkalistener(topics=topic,groupid=groupid)public void processupdateing(string rate){log.debag(rate);
}