我在 Spring 启动应用程序中使用Reactor-Kafka。我正在尝试处理 transient 错误,如果Kafka关闭,则我希望无限期地重试连接到Kafka。
我观察到默认行为是,被动kafka消耗批处理,然后处理每个记录并自动提交整个批处理。现在,如果在批处理之间,kafka连接丢失,那么默认情况下它会尝试在大约10秒内重新连接100次**。那么,如何修改此行为呢?
我已经尝试设置以下属性:
reconnect.backoff.ms: 2000ms
reconnect.backoff.max.ms: 5000ms
还尝试了以下属性:
akka.kafka.consumer.reconnection-max-retries: 300
akka.kafka.consumer.reconnection-min-backoff: 1000ms
akka.kafka.consumer.reconnection-max-backoff: 5m
akka.kafka.consumer.reconnection-random-factor: 0.5
但没有一个属性工作,我无法增加/减少100次重试。
注意:我还使用Kafka Autoconfig进行配置。
如何做到这一点?
- 问题更新**
- 1.我使用自动配置的方式是使用KafkaProperties类**:
@Bean
public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {
return ReceiverOptions
.<String, String>create(kafkaProperties.buildConsumerProperties())
.subscription(Collections.singleton("topic-name"));
}
KafkaProperties将自动从www.example.com读取配置application.properties/application.yml
- 2.**当Kafka试图重新连接时,它会连接100次,然后停止消耗它。那么,如何控制这个数字呢?
1条答案
按热度按时间irlmq6kh1#
Akka与Reactor Kafka和Sping Boot 没有任何关系。所以,这些
akka.kafka.consumer
肯定不会应用于您的Reactor Kafka用户。另一方面,在Sping Boot 中还没有Reactor Kafka自动配置:https://github.com/spring-projects/spring-boot/pull/30567。所以,同样,该信息是不相关的,无论您想在
application.properties
中配置什么,在REactor Kafka中的KafkaReceiver
都不会被ReceiverOptions
自动拾取。您可以手动使用ReceiverOptions
选项填充,例如,您可以将reconnect.backoff.ms
设置为static <K, V> ReceiverOptions<K, V> create(@NonNull Map<String, Object> configProperties) {
。您还可以注意
ReceiverOptions
的maxCommitAttempts
和commitRetryInterval
属性。