apache-kafka 哪一个是最好的使用单独的Kafka模板/使用相同的Kafka模板为不同的主题

czq61nw1  于 2022-11-01  发布在  Apache
关注(0)|答案(1)|浏览(113)

我的应用程序需要将不同的记录发送到不同的主题。我的应用程序正在使用同一个Kafka群集。由于应用程序使用同一个Kafka群集,因此创建一个生成器工厂就足够了(如果需要更多,请告诉我)。
在我看来,我有两个选择。
1.对两个主题使用相同的kafkaTemplate,并使用如下主题调用send方法(请假设我使用了Spring默认的Kafka生产者配置)。这里,我们需要为每个调用传递主题&我们对多个主题使用相同的Kafka模板。

class ProducerService {
    @Autowired
    private KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;

    public void send(String topic, GenericRecord key, GenericRecord value) {
        ListenableFuture<SendResult<GenericRecord, GenericRecord>> future = kafkaTemplate.send(topic, key, value);
    }
}

1.对不同的主题使用不同的Kafka模板。我想知道这样的设置是否会提高性能。

import org.apache.avro.generic.GenericRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
public class KafkaConfig {

    @Value("kafka.topic.first")
    private String firstTopic;
    @Value("kafka.topic.second")
    private String secondTopic;

    @Bean(name = "firstKafkaTemplate")
    public KafkaTemplate<GenericRecord, GenericRecord> firstKafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> defaultKafkaProducerFactory) {
        KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
        kafkaTemplate.setDefaultTopic(firstTopic);
        return kafkaTemplate;
    }

    @Bean(name = "secondKafkaTemplate")
    public KafkaTemplate<GenericRecord, GenericRecord> secondKafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> defaultKafkaProducerFactory) {
        KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
        kafkaTemplate.setDefaultTopic(secondTopic);
        return kafkaTemplate;
    }

}

class ProducerService {
    @Autowired
    @Qualifier("firstKafkaTemplate")
    private KafkaTemplate<GenericRecord, GenericRecord> firstTopicTemplate;

    @Autowired
    @Qualifier("secondKafkaTemplate")
    private KafkaTemplate<GenericRecord, GenericRecord> secondTopicTemplate;

    public void send(String topic, GenericRecord key, GenericRecord value) {
        ListenableFuture<SendResult<GenericRecord, GenericRecord>> future;
        if ("first".equalsIgnoreCase(topic)) {
            future = firstTopicTemplate.sendDefault(key, value);
        } else if ("second".equalsIgnoreCase(topic)) {
            future = secondTopicTemplate.sendDefault(key, value);
        } else {
            throw new RuntimeException("topic is not configured");
        }
    }
}

在内部,Kafka进行批处理,并通过一个单独的线程将批处理发送给Kafka。哪种方法是发送记录以获得性能的更好方法?还是性能没有差异?

wljmcqd8

wljmcqd81#

我根据吞吐量回答自己的问题。当我处理记录时,遇到了超时问题。

单生成器在大多数情况下都是高效的

如果您遇到任何超时问题,因为排队记录的速度比它们可以发送的速度快得多。然后调整下面的参数来摆脱超时问题。请注意,这里我添加了虚拟值。您必须测试您的应用程序,以获得应用程序所需的值。

spring.kafka.producer.properties.[linger.ms]=100
spring.kafka.producer.properties.[batch.size]=100000
spring.kafka.producer.properties.[request.timeout.ms]=30000
spring.kafka.producer.properties.[delivery.timeout.ms]=200000
request.timeout.ms

发送数据时生成器等待服务器回复的时间将由此参数控制。如果达到超时而没有回复,则生成器将重试发送或以错误响应(通过异常错误或发送回调)。

linger.ms

linger.ms 控制在发送当前批之前等待更多消息的时间。Kafka生成器在当前批已满或达到linger.ms限制时发送一批消息。默认情况下,生成器将在有发送器线程可用于发送消息时立即发送消息,即使批中只有一条消息。通过将linger.ms设置为大于0,我们指示生成器等待几毫秒,以便在将批发送到代理之前向批中添加其他消息。这会增加延迟,但也会增加吞吐量(因为我们一次发送更多消息,所以每条消息的开销更少)。

批量

当多个记录被发送到同一分区时,生成器将它们一起进行批处理。(不是消息!)。当批处理已满时,将发送批处理中的所有消息。但这并不意味着生产者将等待批处理变满。生产者将发送半因此,将批处理大小设置得过大不会导致发送消息时出现延迟;它只会为批处理使用更多的内存。2将批处理大小设置得太小会增加一些开销,因为生成器将需要更频繁地发送消息。

delivery.timeout.ms

在呼叫发送后报告成功或失败的时间上限()返回。这限制了记录在发送之前将延迟的总时间,即等待代理确认的时间(如果需要),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已用尽,或者记录被添加到已达到较早的传递过期期限的批处理中。此配置的值应大于或等于request.timeout.ms与linger. ms之和。

如果您仍然面临超时问题,则需要更多生成器

通过增加同一Kafka模板的线程来增加生成器

为此,在创建生成器工厂时,必须将下面的setProducerPerThread设置为True。
我添加了一个TaskExecutor来控制生产者的数量,因为生产者的数量=线程的数量

@Configuration
Public class Conf{

   @Bean("kafkaTaskExecutor")
    public TaskExecutor getKafkaAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(15);
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setThreadNamePrefix("Kafka-Async-");
        return executor;
    }

  @Bean
    public KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> producerFactory) {
        if (producerFactory instanceof DefaultKafkaProducerFactory<GenericRecord, GenericRecord> defaultFactory) {
            defaultFactory.setProducerPerThread(true);
        }
        return new KafkaTemplate<>(producerFactory);
    }
}

不要改变你的Kafka密码。让它保持原样。我们将创建一个新的层来使它工作。

class AsyncProducer{

       @Autowired
       private KafkaProducer producer;

      @Value("${topic.name}")
      private String topic;

     @Autowired
     @Qualifier("kafkaTaskExecutor")
     private TaskExecutor taskExecutor;

      public void sendAsync(GenericRecord key, GenericRecord value){
          CompletableFuture.completeFuture(value).thenAcceptAsync( val->   producer.send(topic,key,value), taskExecutor);
     }

}

使用上述设置,最初将有5个生成器开始发送记录,当负载变高时,将增加到15个生成器

使用多个Kafka模板

如果你认为,你仍然没有达到你的吞吐量,那么你可以尝试增加模板的数量。但实际上,我没有尝试这个,因为我得到了第二种方法的理想结果。

相关问题