spring-kafka自定义反序列化程序

llmtgqce  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(411)

我将按照此链接中列出的步骤创建客户反序列化程序。我从kafka收到的消息在json字符串前面有纯文本“log message-”,我希望反序列化程序忽略这个字符串并解析json数据。有办法吗?
应用

@SpringBootApplication
public class TransactionauditServiceApplication {

    public static void main(String[] args) throws InterruptedException {
        new SpringApplicationBuilder(TransactionauditServiceApplication.class).web(false).run(args);
    }

    @Bean
    public MessageListener messageListener() {
        return new MessageListener();
    }

    public static class MessageListener {

        @KafkaListener(topics = "ctp_verbose", containerFactory = "kafkaListenerContainerFactory")
        public void listen(@Payload ConciseMessage message, 
                  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
            System.out.println("Received Messasge in group foo: " + message.getStringValue("traceId") + " partion " + partition);
        }
    }
}

消费者配置

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress:localhost:9092}")
    private String bootstrapAddress;

    @Value(value = "${groupId:audit}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, ConciseMessage> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(ConciseMessage.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, ConciseMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
5anewei6

5anewei61#

写这行字 new JsonDeserializer<>(ConciseMessage.class) ,你只是告诉Kafka你想把信息转换成 ConciseMessage 类型。所以,这并不意味着它是一个自定义反序列化程序。要解决您的问题,您很可能需要自己实现一个反序列化程序,该程序具有剥离文本“logmessage-”的逻辑。

相关问题