Spring Boot 将JSON对象作为Kafka上的消息发送时出错

4ioopgfo  于 2022-11-23  发布在  Spring
关注(0)|答案(1)|浏览(252)

我是一个学生,试图学习apache Kafka。我试图在使用java的spring Boot 中使用kafka将JSON对象作为消息发送。但是当我试图发送它时,它抛出了一个错误,说我的模型类不能被转换为字符串,即使我在www.example.com文件中提到了Json序列化器application.properties。例外是:

java.lang.ClassCastException: class com.example.demo.model.BookES cannot be cast to class java.lang.String (com.example.demo.model.BookES is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')

我的application.properties文件是

server.port=8081

spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

我的控制器类,我正尝试在其中发送消息

@PostMapping("/publish")
    public ResponseEntity<String> publish(@RequestBody BookES bookES){
        logger.info("in publish method");
        kafkaProducer.sendMessage(bookES);
        return ResponseEntity.ok("Json message sent to kafka topic");
    }

我的Kafka生产者类有sendMessage方法:

package com.example.demo.kafka;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import com.example.demo.controller.BookController;
import com.example.demo.model.Book;
import com.example.demo.model.BookES;

@Service
public class KafkaProducer {
    
    @Autowired
    private NewTopic topic;
    
    Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
    
    private String topicName = "bookmanagement";
    
    @Autowired
    private KafkaTemplate<String, BookES> kafkaTemplate;

    
    
    public void sendMessage(BookES bookES) {
        
        logger.info("in sendMessage method");
        logger.info(String.format("Message sent -> %s",bookES.toString()));
        
        Message<BookES> message = MessageBuilder.withPayload(bookES).setHeader(KafkaHeaders.TOPIC, topic.name()).build();
        kafkaTemplate.send(message);
        
        
        
    }
    
    
}

我的模型类:

package com.example.demo.model;

import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Document(indexName="my-application")
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class BookES{
    @Override
    public String toString() {
        return "BookES [bookId=" + bookId + ", bookName=" + bookName + ", description=" + description + "]";
    }
    @Id
    private String bookId;
    private String bookName;
    private String description;
    public String getBookId() {
        // TODO Auto-generated method stub
        return this.bookId;
    }
    public String getBookName() {
        return bookName;
    }
    public void setBookName(String bookName) {
        this.bookName = bookName;
    }
    public String getDescription() {
        return description;
    }
    public void setDescription(String description) {
        this.description = description;
    }
    public void setBookId(String bookId) {
        this.bookId = bookId;
    }

    
}

我的项目在github链接上:github link
我试着用不同的注解(如@JsonSerializer等)来旋转我的模型类,但没有成功。我在postman上得到的回应是:

{
    "timestamp": "2022-11-22T11:24:30.738+00:00",
    "status": 500,
    "error": "Internal Server Error",
    "message": "Can't convert value of class com.example.demo.model.BookES to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer",
    "path": "/books/publish"
}
au9on6nz

au9on6nz1#

你需要提供你自己的序列化器,它可以将你的BookES序列化为字节数组。下面是一个我用作通用Json序列化器的类。它应该可以很好地处理你的类:

package ***;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.mgnt.utils.JsonUtils;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

    @Component
    public class GenericJsonSerializer implements Serializer<Object> {
        @Override
        public byte[] serialize(String s, Object obj) {
            byte[] result;
            try {
                result = JsonUtils.writeObjectToJsonString(obj).getBytes(StandardCharsets.UTF_8);
            } catch (JsonProcessingException e) {
                throw new SerializationException("Error occurred while serializing " + obj.getClass().getCanonicalName() + " to byte[]", e);
            }
            return result;
        }
    }

在您的属性中,您需要将该类注册为反序列化程序。请注意,当您读回消息时,如果您希望将其作为BookES类读取,则需要提供反序列化程序类,该类将接受字节数组并将其转换回BookES类。(您必须实现org.apache.kafka.common.serialization.Deserializer接口)。(由我编写和维护)。您可以很容易地替换它,只需使用Json-Jackson库中的ObjectMapper类或使用GSON库即可。但它可能会使使用JsonUtils更简单。如果您希望使用它,请使用Javadoc for JsonUtils class。可以通过Maven ArtifactGithub上的

相关问题