如何控制Reactor Kafka/Reactive Kafka中的Kafka连接重试

yiytaume  于 2023-03-07  发布在  Apache
关注(0)|答案(1)|浏览(145)

我在 Spring 启动应用程序中使用Reactor-Kafka。我正在尝试处理 transient 错误,如果Kafka关闭,则我希望无限期地重试连接到Kafka。
我观察到默认行为是,被动kafka消耗批处理,然后处理每个记录并自动提交整个批处理。现在,如果在批处理之间,kafka连接丢失,那么默认情况下它会尝试在大约10秒内重新连接100次**。那么,如何修改此行为呢?
我已经尝试设置以下属性:

reconnect.backoff.ms: 2000ms
reconnect.backoff.max.ms: 5000ms

还尝试了以下属性:

akka.kafka.consumer.reconnection-max-retries: 300
akka.kafka.consumer.reconnection-min-backoff: 1000ms
akka.kafka.consumer.reconnection-max-backoff: 5m
akka.kafka.consumer.reconnection-random-factor: 0.5

但没有一个属性工作,我无法增加/减少100次重试。
注意:我还使用Kafka Autoconfig进行配置。
如何做到这一点?

    • 问题更新**
    • 1.我使用自动配置的方式是使用KafkaProperties类**:
@Bean
public ReceiverOptions<String, String> receiverOptions(KafkaProperties kafkaProperties) {

    return ReceiverOptions
          .<String, String>create(kafkaProperties.buildConsumerProperties())
          .subscription(Collections.singleton("topic-name"));

}

KafkaProperties将自动从www.example.com读取配置application.properties/application.yml

    • 2.**当Kafka试图重新连接时,它会连接100次,然后停止消耗它。那么,如何控制这个数字呢?
irlmq6kh

irlmq6kh1#

Akka与Reactor Kafka和Sping Boot 没有任何关系。所以,这些akka.kafka.consumer肯定不会应用于您的Reactor Kafka用户。
另一方面,在Sping Boot 中还没有Reactor Kafka自动配置:https://github.com/spring-projects/spring-boot/pull/30567。所以,同样,该信息是不相关的,无论您想在application.properties中配置什么,在REactor Kafka中的KafkaReceiver都不会被ReceiverOptions自动拾取。您可以手动使用ReceiverOptions选项填充,例如,您可以将reconnect.backoff.ms设置为static <K, V> ReceiverOptions<K, V> create(@NonNull Map<String, Object> configProperties) {
您还可以注意ReceiverOptionsmaxCommitAttemptscommitRetryInterval属性。

相关问题