我需要向apache kafka主题发送(post)json负载,但收到以下错误:-“message”:“无法将com.xyz.user类的值转换为value.serializer中指定的org.apache.kafka.common.serialization.stringserializer类”
spring还显示类转换异常:-java.lang.classcastexception:com.xyz.user不能转换为java.lang.string
下面是我的模态,Kafka配置和控制器
public class User {
private String firstname;
private String email;
public User() {}
public User(String firstname, String email) {
super();
this.firstname = firstname;
this.email = email;
}
public String getFirstname() {
return firstname;
}
public void setFirstname(String firstname) {
this.firstname = firstname;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
@Override
public String toString() {
return "UserModel [firstname=" + firstname + ", email=" + email + "]";
}
}
Kafka构型
package config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import com.xyz.User;
@Configuration
public class KafkaConfiguration {
@Bean
public ProducerFactory<String, User> producerFactory(){
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, User> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
控制器
@RestController
@RequestMapping("/kafka")
public class UserController {
@Autowired
private KafkaTemplate<String, User> kafkaTemplate;
private static String TOPIC = "kafka-producer";
@PostMapping("/publish")
public void getUserId(@RequestBody User user) {
kafkaTemplate.send(TOPIC, user);
Postman 发送的json有效负载
{
"firstname" : "xyz",
"email" : "xyz@gmail.com.com"
}
2条答案
按热度按时间46qrfjad1#
跟踪对我有用。
pkbketx92#
能够在我的本地设置中重现你的错误。
如果我们看一下日志,它会说
value.serializer
值仍引用默认值StringSerializer
而不是预期的是JsonSerializer
反过来说你的生产者配置没有生效。简言之,你的KafkaCOnfiguration
未引用类。你的
KafkaConfiguration
上课的时间有点长config
Package 和User
在某个com.xyz包中初始化。所以解决方案是确保它能够获取您的配置。最有可能的是,该包可能没有得到配置/beans定义的扫描。如果您将kafkanconfiguration移到应用程序的根包中,那么您的原始代码应该可以正常工作。如果你说你的kafkatemplate对象被注射了,那么实际上不是。被注入的是由spring-kafka自动配置定义的。