发送字符串列表时构造Kafka生产者失败

w41d8nur  于 2023-11-16  发布在  Apache
关注(0)|答案(1)|浏览(123)

我是Kafka的新手,我正在尝试读取一个文本文件,并创建一个我想发送给消费者的字符串列表。我正在使用Java 21和Sping Boot 3. 2. 0(SNAPSHOT)。
这是Kafka生产者配置:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());

        return properties;
    }

    @Bean
    public ProducerFactory<String, List<String>> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

字符串
这是主题配置:

@Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic alexTopic(){
        return TopicBuilder.name("securityTopic")
                .build();
    }
}


这是“application.properties“:

spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092


这就是我接收文件的方式:

public Optional uploadFile(MultipartFile file){
    if(file.isEmpty()){
        return Optional.of(new EmptyFileException("File is empty please double check"));
    }
    
    return Optional.of(this.upload.uploadFile(file));
}


我是这样把“名单“发给Kafka的:

public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Override
public String uploadFile(MultipartFile file) {
    try(BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(file.getInputStream()))){

        String line;
        int i=0;
        List<String> list = new ArrayList<>();
        while ((line=reader.readLine())!=null) {
            if(i==10){
                this.kafkaTemplate.send("securityTopic",list);


如果我发送纯字符串,我使用:

StringSerializer.class


而不是:

ListSerializer.class.getName()


在Kafka producer配置中,还将其他泛型更改为String而不是List正在工作。但是我如何发送List呢?我再次使用Kafka 2或3天,我不知道是否必须创建自己的序列化器或使用内置的东西?
给我的错误是在标题中:“未能构造Kafka生产者”。
我也试过使用“ListSerializer.class”,但不起作用。
我也尝试使用“JsonSerialize.class”,仍然是同样的错误。
感谢您抽出时间阅读本文!

8iwquhpp

8iwquhpp1#

这是错误的导入。正确的导入是:
“import org.springframework.Kafka.support.serializer.JsonSerializer;”

相关问题