我的应用程序需要将不同的记录发送到不同的主题。我的应用程序正在使用同一个Kafka群集。由于应用程序使用同一个Kafka群集,因此创建一个生成器工厂就足够了(如果需要更多,请告诉我)。
在我看来,我有两个选择。
1.对两个主题使用相同的kafkaTemplate,并使用如下主题调用send方法(请假设我使用了Spring默认的Kafka生产者配置)。这里,我们需要为每个调用传递主题&我们对多个主题使用相同的Kafka模板。
class ProducerService {
@Autowired
private KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate;
public void send(String topic, GenericRecord key, GenericRecord value) {
ListenableFuture<SendResult<GenericRecord, GenericRecord>> future = kafkaTemplate.send(topic, key, value);
}
}
1.对不同的主题使用不同的Kafka模板。我想知道这样的设置是否会提高性能。
import org.apache.avro.generic.GenericRecord;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
public class KafkaConfig {
@Value("kafka.topic.first")
private String firstTopic;
@Value("kafka.topic.second")
private String secondTopic;
@Bean(name = "firstKafkaTemplate")
public KafkaTemplate<GenericRecord, GenericRecord> firstKafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> defaultKafkaProducerFactory) {
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
kafkaTemplate.setDefaultTopic(firstTopic);
return kafkaTemplate;
}
@Bean(name = "secondKafkaTemplate")
public KafkaTemplate<GenericRecord, GenericRecord> secondKafkaTemplate(ProducerFactory<GenericRecord, GenericRecord> defaultKafkaProducerFactory) {
KafkaTemplate<GenericRecord, GenericRecord> kafkaTemplate = new KafkaTemplate<>(defaultKafkaProducerFactory);
kafkaTemplate.setDefaultTopic(secondTopic);
return kafkaTemplate;
}
}
class ProducerService {
@Autowired
@Qualifier("firstKafkaTemplate")
private KafkaTemplate<GenericRecord, GenericRecord> firstTopicTemplate;
@Autowired
@Qualifier("secondKafkaTemplate")
private KafkaTemplate<GenericRecord, GenericRecord> secondTopicTemplate;
public void send(String topic, GenericRecord key, GenericRecord value) {
ListenableFuture<SendResult<GenericRecord, GenericRecord>> future;
if ("first".equalsIgnoreCase(topic)) {
future = firstTopicTemplate.sendDefault(key, value);
} else if ("second".equalsIgnoreCase(topic)) {
future = secondTopicTemplate.sendDefault(key, value);
} else {
throw new RuntimeException("topic is not configured");
}
}
}
在内部,Kafka进行批处理,并通过一个单独的线程将批处理发送给Kafka。哪种方法是发送记录以获得性能的更好方法?还是性能没有差异?
1条答案
按热度按时间wljmcqd81#
我根据吞吐量回答自己的问题。当我处理记录时,遇到了超时问题。
单生成器在大多数情况下都是高效的
如果您遇到任何超时问题,因为排队记录的速度比它们可以发送的速度快得多。然后调整下面的参数来摆脱超时问题。请注意,这里我添加了虚拟值。您必须测试您的应用程序,以获得应用程序所需的值。
request.timeout.ms
发送数据时生成器等待服务器回复的时间将由此参数控制。如果达到超时而没有回复,则生成器将重试发送或以错误响应(通过异常错误或发送回调)。
linger.ms
linger.ms 控制在发送当前批之前等待更多消息的时间。Kafka生成器在当前批已满或达到linger.ms限制时发送一批消息。默认情况下,生成器将在有发送器线程可用于发送消息时立即发送消息,即使批中只有一条消息。通过将linger.ms设置为大于0,我们指示生成器等待几毫秒,以便在将批发送到代理之前向批中添加其他消息。这会增加延迟,但也会增加吞吐量(因为我们一次发送更多消息,所以每条消息的开销更少)。
批量
当多个记录被发送到同一分区时,生成器将它们一起进行批处理。(不是消息!)。当批处理已满时,将发送批处理中的所有消息。但这并不意味着生产者将等待批处理变满。生产者将发送半因此,将批处理大小设置得过大不会导致发送消息时出现延迟;它只会为批处理使用更多的内存。2将批处理大小设置得太小会增加一些开销,因为生成器将需要更频繁地发送消息。
delivery.timeout.ms
在呼叫发送后报告成功或失败的时间上限()返回。这限制了记录在发送之前将延迟的总时间,即等待代理确认的时间(如果需要),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已用尽,或者记录被添加到已达到较早的传递过期期限的批处理中。此配置的值应大于或等于request.timeout.ms与linger. ms之和。
如果您仍然面临超时问题,则需要更多生成器
通过增加同一Kafka模板的线程来增加生成器
为此,在创建生成器工厂时,必须将下面的setProducerPerThread设置为True。
我添加了一个TaskExecutor来控制生产者的数量,因为生产者的数量=线程的数量
不要改变你的Kafka密码。让它保持原样。我们将创建一个新的层来使它工作。
使用上述设置,最初将有5个生成器开始发送记录,当负载变高时,将增加到15个生成器
使用多个Kafka模板
如果你认为,你仍然没有达到你的吞吐量,那么你可以尝试增加模板的数量。但实际上,我没有尝试这个,因为我得到了第二种方法的理想结果。