我目前的开发环境如下。
Spring Boot2.x
SpringKafka2.5.5
Kafka2.5.1版
生产者/消费者密钥序列化器/反序列化器=字符串(反)序列化器.class
生产者/消费者值序列化程序/反序列化程序=json(反)序列化程序.class
生产者发送kafka消息时,自定义dto(或pojo)转换为 JSON String
打字通过 ObjectMapper.writeValueAsString(customDto)
.
生产工厂<string,string>
kafkatemplate<string,string>
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, msg); // The message is a JSON String variable made with ObjectMapper.
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record);
当一个 JSON String
消息在消费者中被接收,它被转换成相应的dto(或pojo)通过 ObjectMapper.readValue(message, CustomDto.class)
收到了。
消费者工厂<string,string>
作为参考,自定义dto的类型不止一种。
我想知道,如果它是正确的发送和接收自定义数据转换成 JSON String
每次如上所述。
或者有没有一种不用转换就可以直接发送和接收自定义dto的方法?我想使用泛型类型。怎么样?
2条答案
按热度按时间gstyhher1#
它必须转换成
byte[]
,但您可以使用框架提供的(反)序列化程序,而不是自己进行。https://docs.spring.io/spring-kafka/docs/current/reference/html/#json-塞德
fcwjkofz2#
有一种更好的方法可以通过kafka发送一个复杂类型,它包含一个可以自动序列化和反序列化的结构。
使用avro架构https://avro.apache.org/
添加合流模式注册表以处理序列化/反序列化过程和验证。汇合
看看这个例子,看看它是多么干净和高效。