如何解决SpringKafka中的messageconversionexception?

xyhw6mcr  于 2021-06-07  发布在  Kafka
关注(0)|答案(0)|浏览(471)

我一直在用SpringKafka做我的项目,现在,我面临一个问题。

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [com.polubentcev.messenger.model.subscription.SubscriptionsRequest] for GenericMessage [payload={"userId":"4","pagination":{"offset":0,"quantity":10}}, headers={kafka_offset=23, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@3817b042, kafka_correlationId=[B@13cf505b, kafka_timestampType=CREATE_TIME, kafka_replyTopic=[B@58355ff, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=07, kafka_receivedTimestamp=1548552030765, __TypeId__=[B@7480d2ef}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283) ~[spring-kafka-2.2.3.RELEASE.jar:2.2.3.RELEASE]
    ... 14 common frames omitted

当消息到达我的微服务时,就会抛出这样的异常。
以下是我的配置:

private final Map<String, Object> consumerProps;

public KafkaConsumerConfiguration(@Value("${kafka.bootstrap-servers}") final String bootstrapServers,
                                  @Value("${kafka.consumer.group-id}") final String consumerGroupId) {
    this.consumerProps = Map.of(
            ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
            ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId,
            ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class,
            ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonSerializer.class
    );
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Pair<String, String>>>
kafkaSubscriptionListenerContainerFactory(final ReplyingKafkaTemplate<String, Pair<String, String>, UsersSubscription> kafkaTemplate) {
    final DefaultKafkaConsumerFactory<String, Pair<String, String>> cf = new DefaultKafkaConsumerFactory<>(
            consumerProps, new StringDeserializer(), new JsonDeserializer<>()
    );
    final ConcurrentKafkaListenerContainerFactory<String, Pair<String, String>> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf);
    factory.setReplyTemplate(kafkaTemplate);
    return factory;
}

这很奇怪,因为它必须工作。我试着自己解决,但没有成功,因为我不知道在哪里可以挖得更深。
你能帮我吗?
顺便说一句,我试着在调试器中转换负载 String 使用新的 ObjectMapper<>().readValue() 而且它也很管用 SubscriptionsRequest 包含 @JsonCreator 以及反序列化所需的一切。
解决方法:我添加了 SubscriptionsRequest(String str) 我在其中使用 ObjectMapper . 它工作,但它的丑陋。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题