我们可以在Spring Boot 中使用多个Kafka模板吗?

km0tfn4u  于 2023-10-15  发布在  Apache
关注(0)|答案(3)|浏览(117)

在我的Sping Boot Kafka发布应用程序中,我希望提供对以String(JSON)或字节格式发布消息的支持,因为我希望同时提供对JSON和Avro的支持。但是Kafka template在spring Boot 中让我们只定义其中一个模板。有没有一种方法可以同时使用这两个模板,或者有没有其他方法可以同时支持json和avro?
KafkaTemplate<String, String>只适用于字符串,但我也想发布avro,它应该是类似KafkaTemplate<String, byte[]>的东西

kupeojn6

kupeojn61#

您可以尝试使用不同的模板创建KafkaTemplate:

@Bean
public ProducerFactory<String, String> producerFactoryString() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters .... 
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public ProducerFactory<String, byte[]> producerFactoryByte() {
    Map<String, Object> configProps = new HashMap<>();
    //additional config parameters ....
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, BytesSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
    return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public KafkaTemplate<String, byte[]> kafkaTemplateByte() {
    return new KafkaTemplate<>(producerFactoryByte());
}
b5lpy0ml

b5lpy0ml2#

您可以创建Kafka配置。我不得不将数据发送到两个不同的服务器。

@Configuration
public class KafkaConfig {

    private final MosaicKafkaConfig mosaicKafkaConfig;
    private final StreamKafkaConfig streamKafkaConfig;

    public KafkaConfig(MosaicKafkaConfig mosaicKafkaConfig, StreamKafkaConfig streamKafkaConfig) {
        this.mosaicKafkaConfig = mosaicKafkaConfig;
        this.streamKafkaConfig = streamKafkaConfig;
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForMosaic() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(mosaicKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(mosaicKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(mosaicKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", mosaicKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", mosaicKafkaConfig.getSaslMechanism());
        props.put("security.protocol", mosaicKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(mosaicKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(mosaicKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(mosaicKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForMosaic(ProducerFactory<Object, Object> kafkaProducerFactoryForMosaic) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForMosaic);
        return kafkaTemplate;
    }

    @Bean
    public ProducerFactory<?, ?> kafkaProducerFactoryForStream() {
        KafkaProperties kafkaProperties = new KafkaProperties();
        KafkaProperties.Ssl ssl = kafkaProperties.getSsl();
        ResourceLoader resourceLoader = new DefaultResourceLoader();
        Resource resource = resourceLoader.getResource(streamKafkaConfig.getSslTrustStoreLocation());
        ssl.setTrustStoreLocation(resource);
        ssl.setTrustStorePassword(streamKafkaConfig.getSslTrustStorePassword());
        ssl.setTrustStoreType(streamKafkaConfig.getSslTrustStoreType());
        Map<String, String> props = kafkaProperties.getProperties();
        props.put("sasl.jaas.config", streamKafkaConfig.getSaslConfig());
        props.put("sasl.mechanism", streamKafkaConfig.getSaslMechanism());
        props.put("security.protocol", streamKafkaConfig.getSaslSecProtocol());

        kafkaProperties.getProducer().setValueSerializer(streamKafkaConfig.getValaueSerializer());
        kafkaProperties.getProducer().setClientId(streamKafkaConfig.getClientID());
        kafkaProperties.getProducer().setBootstrapServers(streamKafkaConfig.getBootstrapServers());

        Map<String, Object> configProps = kafkaProperties.buildProducerProperties();
        return new DefaultKafkaProducerFactory<>(configProps);

    }

    @Bean
    public KafkaTemplate<?, ?> kafkaTemplateForStream(ProducerFactory<Object, Object> kafkaProducerFactoryForStream) {
        KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<>(kafkaProducerFactoryForStream);
        return kafkaTemplate;
    }
}
g6ll5ycj

g6ll5ycj3#

只是延伸了@i.bondarenko的答案
如果你想从属性文件中配置所有的属性,那么你可以编写自己的自定义Kafka配置来支持多个生产者配置,比如:

kafka:
  producer:
    bootstrap-servers: localhost:9092,localhost:9093,localhost:9094
    producer-string:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    producer-byte:
      retries: 0
      acks: all
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.BytesSerializer

从类文件读取配置:-

@Configuration
@ConfigurationProperties(prefix = "kafka")
@Getter
@Setter
public class KafkaCustomProperties {
  private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
  private String clientId;
  private Map<String, String> properties = new HashMap<>();
  private Map<String, KafkaProperties.Producer> producer;
  private Map<String, KafkaProperties.Consumer> consumer;
  private KafkaProperties.Ssl ssl = new KafkaProperties.Ssl();
  private KafkaProperties.Security security = new KafkaProperties.Security();

  public Map<String, Object> buildCommonProperties() {
    Map<String, Object> properties = new HashMap<>();
    if (this.bootstrapServers != null) {
      properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    }
    if (this.clientId != null) {
      properties.put(CommonClientConfigs.CLIENT_ID_CONFIG, this.clientId);
    }
    properties.putAll(this.ssl.buildProperties());
    properties.putAll(this.security.buildProperties());
    if (!CollectionUtils.isEmpty(this.properties)) {
      properties.putAll(this.properties);
    }
    return properties;
  }
}

使用此配置为每个使用@Qualifier注解的生产者生成KafkaTemplate bean

@Configuration
@RequiredArgsConstructor
@Slf4j
public class KafkaMultipleProducerConfig {

  private final KafkaCustomProperties kafkaCustomProperties;

  @Bean
  @Qualifier("producer-string")
  public KafkaTemplate<String, Object> producerStringKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer-string"));
  }

  @Bean
  @Qualifier("producer-byte")
  public KafkaTemplate<String, Object> producerByteKafkaTemplate() {
    return new KafkaTemplate<>(producerFactory("producer-byte"));
  }

  private ProducerFactory<String, Object> producerFactory(String producerName) {
    Map<String, Object> properties = new HashMap<>(kafkaCustomProperties.buildCommonProperties());
    if (nonNull(kafkaCustomProperties.getProducer())) {
      KafkaProperties.Producer producerProperties = kafkaCustomProperties.getProducer().get(producerName);
      if (nonNull(producerProperties)) {
        properties.putAll(producerProperties.buildProperties());
      }
    }
    log.info("Kafka Producer '{}' properties: {}", producerName, properties);
    return new DefaultKafkaProducerFactory<>(properties);
  }
}

并使用这些KafkaTemplate bean将消息发布到不同的生产者配置。
请参考https://codingnconcepts.com/spring-boot/configure-multiple-kafka-producer/的帖子以获得详细解释。

相关问题