SpringKafka:同一主题上的不同json负载

rmbxnbpk  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(418)

我需要在同一个kafka主题上发送不同的json负载(例如,foo、bar、car…),而不使用父类
基于SpringKafka文档,我可以使用 @KafkaListener 在类级别并指定 @KafkaHandler 方法级(doc)

@KafkaListener(topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(Foo foo) {
        ...
    }

    @KafkaHandler
    public void listen(Bar bar) {
        ...
    }

    @KafkaHandler
    public void listen(Car car) {
        ...
    }
}

但我有个例外:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.listener.LoggingErrorHandler - Error while processing: ConsumerRecord(topic = myTopic, partition = 1, offset = 0, CreateTime = 1508859519287, checksum = 3297149058, serialized key size = -1, serialized value size = 13, key = null, value = {"foo":"foo"})
org.springframework.kafka.KafkaException: No method found for class java.util.LinkedHashMap
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getHandlerForPayload(DelegatingInvocableHandler.java:92)
    at org.springframework.kafka.listener.adapter.DelegatingInvocableHandler.getMethodNameFor(DelegatingInvocableHandler.java:147)
    at org.springframework.kafka.listener.adapter.HandlerAdapter.getMethodAsString(HandlerAdapter.java:60)
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72)
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:570)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:745)

我也尝试过这种方法,但不起作用:

@KafkaListener(topics = "myTopic")
   public void receive(ConsumerRecord<String, ?> payload)
   {
      if (payload.value() instanceof Foo)
      {
         //
      }
      else if (payload.value() instanceof Car)
      {
         //
      }
   }

如何配置生产者和消费者使用springkafka在同一主题上发送不同的json负载?

k97glaaz

k97glaaz1#

在使用json和多方法侦听器时有一个catch-22;我们需要知道的类型路由到正确的方法;我们无法从方法签名推断转换的类型,因为我们还不知道要调用哪个方法。
您需要一个消息转换器,它可以从数据或使用 Header (与Kafka11)。
然后,在转换器转换成正确的类型之后,我们就可以确定要调用哪个方法了。
你的第二次尝试 ConsumerRecord 将包含原始json;您需要使用 ObjectMapper 但是,同样的,你需要一些提示,告诉你你想转换成什么类型(或者用暴力,直到成功)。

相关问题