当前,当我创建producer来发送我的记录时,例如由于某些原因kafka不可用,producer会无限期地发送相同的消息。如何停止生成消息(例如,在我收到此错误3次后):
Connection to node -1 could not be established. Broker may not be available.
我用的是React堆Kafka制作人:
@Bean
public KafkaSender<String, String> createSender() {
return KafkaSender.create(senderOptions());
}
private SenderOptions<String, String> senderOptions() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ProducerConfig.CLIENT_ID_CONFIG, kafkaProperties.getClientId());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.RETRIES_CONFIG, kafkaProperties.getProducerRetries());
return SenderOptions.create(props);
}
然后用它来发送记录:
sender.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topicName, null, message), message)))
.flatMap(result -> {
if (result.exception() != null) {
return Flux.just(ResponseEntity.badRequest()
.body(result.exception().getMessage()));
}
return Flux.just(ResponseEntity.ok().build());
})
.next();
3条答案
按热度按时间olhwl3o21#
而不是专注于错误。解决问题-它没有连接到代理
您没有在撰写文件中覆盖此项,因此您的应用程序正在尝试连接到自身
在作曲中,你好像忘了这个
或者,如果可能,可以使用现有的融合rest代理docker映像,而不是重新创建轮子
bxgwgixi2#
您可以使用断路器模式来解决此类问题,但在应用此模式之前,请尝试查找根本原因,并且您的producerconfig.retries\u config属性似乎在某个地方被重写。
dvtswwa33#
我担心
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
不参与重试,它将迭代到maxBlockTimeMs = 60000
默认情况下。您可以通过ProducerConfig.MAX_BLOCK_MS_CONFIG
属性:更新
我们可以这样解决问题:
注意安全
sender.close();
如果出现错误。我认为是时候提出一个反对KafkaReact堆项目的问题,以允许关闭错误的生产者。