我是一个学生,试图学习apache Kafka。我试图在使用java的spring Boot 中使用kafka将JSON对象作为消息发送。但是当我试图发送它时,它抛出了一个错误,说我的模型类不能被转换为字符串,即使我在www.example.com文件中提到了Json序列化器application.properties。例外是:
java.lang.ClassCastException: class com.example.demo.model.BookES cannot be cast to class java.lang.String (com.example.demo.model.BookES is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
我的application.properties文件是
server.port=8081
spring.kafka.consumer.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
我的控制器类,我正尝试在其中发送消息
@PostMapping("/publish")
public ResponseEntity<String> publish(@RequestBody BookES bookES){
logger.info("in publish method");
kafkaProducer.sendMessage(bookES);
return ResponseEntity.ok("Json message sent to kafka topic");
}
我的Kafka生产者类有sendMessage方法:
package com.example.demo.kafka;
import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import com.example.demo.controller.BookController;
import com.example.demo.model.Book;
import com.example.demo.model.BookES;
@Service
public class KafkaProducer {
@Autowired
private NewTopic topic;
Logger logger = LoggerFactory.getLogger(KafkaProducer.class);
private String topicName = "bookmanagement";
@Autowired
private KafkaTemplate<String, BookES> kafkaTemplate;
public void sendMessage(BookES bookES) {
logger.info("in sendMessage method");
logger.info(String.format("Message sent -> %s",bookES.toString()));
Message<BookES> message = MessageBuilder.withPayload(bookES).setHeader(KafkaHeaders.TOPIC, topic.name()).build();
kafkaTemplate.send(message);
}
}
我的模型类:
package com.example.demo.model;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
@Document(indexName="my-application")
@Data
@AllArgsConstructor
@NoArgsConstructor
@Getter
@Setter
public class BookES{
@Override
public String toString() {
return "BookES [bookId=" + bookId + ", bookName=" + bookName + ", description=" + description + "]";
}
@Id
private String bookId;
private String bookName;
private String description;
public String getBookId() {
// TODO Auto-generated method stub
return this.bookId;
}
public String getBookName() {
return bookName;
}
public void setBookName(String bookName) {
this.bookName = bookName;
}
public String getDescription() {
return description;
}
public void setDescription(String description) {
this.description = description;
}
public void setBookId(String bookId) {
this.bookId = bookId;
}
}
我的项目在github链接上:github link
我试着用不同的注解(如@JsonSerializer等)来旋转我的模型类,但没有成功。我在postman上得到的回应是:
{
"timestamp": "2022-11-22T11:24:30.738+00:00",
"status": 500,
"error": "Internal Server Error",
"message": "Can't convert value of class com.example.demo.model.BookES to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer",
"path": "/books/publish"
}
1条答案
按热度按时间au9on6nz1#
你需要提供你自己的序列化器,它可以将你的
BookES
序列化为字节数组。下面是一个我用作通用Json序列化器的类。它应该可以很好地处理你的类:在您的属性中,您需要将该类注册为反序列化程序。请注意,当您读回消息时,如果您希望将其作为
BookES
类读取,则需要提供反序列化程序类,该类将接受字节数组并将其转换回BookES
类。(您必须实现org.apache.kafka.common.serialization.Deserializer
接口)。(由我编写和维护)。您可以很容易地替换它,只需使用Json-Jackson库中的ObjectMapper
类或使用GSON库即可。但它可能会使使用JsonUtils
更简单。如果您希望使用它,请使用Javadoc for JsonUtils class。可以通过Maven Artifact或Github上的