我是Kafka的新手,我正在尝试读取一个文本文件,并创建一个我想发送给消费者的字符串列表。我正在使用Java 21和Sping Boot 3. 2. 0(SNAPSHOT)。
这是Kafka生产者配置:
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
public Map<String, Object> producerConfig(){
Map<String,Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
return properties;
}
@Bean
public ProducerFactory<String, List<String>> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfig());
}
@Bean
public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
return new KafkaTemplate<>(producerFactory);
}
}
字符串
这是主题配置:
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic alexTopic(){
return TopicBuilder.name("securityTopic")
.build();
}
}
型
这是“application.properties“:
spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092
型
这就是我接收文件的方式:
public Optional uploadFile(MultipartFile file){
if(file.isEmpty()){
return Optional.of(new EmptyFileException("File is empty please double check"));
}
return Optional.of(this.upload.uploadFile(file));
}
型
我是这样把“名单“发给Kafka的:
public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
@Override
public String uploadFile(MultipartFile file) {
try(BufferedReader reader = new BufferedReader(
new InputStreamReader(file.getInputStream()))){
String line;
int i=0;
List<String> list = new ArrayList<>();
while ((line=reader.readLine())!=null) {
if(i==10){
this.kafkaTemplate.send("securityTopic",list);
型
如果我发送纯字符串,我使用:
StringSerializer.class
型
而不是:
ListSerializer.class.getName()
型
在Kafka producer配置中,还将其他泛型更改为String而不是List正在工作。但是我如何发送List呢?我再次使用Kafka 2或3天,我不知道是否必须创建自己的序列化器或使用内置的东西?
给我的错误是在标题中:“未能构造Kafka生产者”。
我也试过使用“ListSerializer.class”,但不起作用。
我也尝试使用“JsonSerialize.class”,仍然是同样的错误。
感谢您抽出时间阅读本文!
1条答案
按热度按时间8iwquhpp1#
这是错误的导入。正确的导入是:
“import org.springframework.Kafka.support.serializer.JsonSerializer;”