consumer.committablesource在连接到kafka集群时如何处理错误

dffbzjpn  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(219)

我试图理解Apache·Kafka的行为 Consumer.committableSource 在几个错误场景中。我现在试图解决的场景是,我用它所连接的kafka集群启动应用程序。kafka集群最终会再次恢复,此时我希望应用程序检测到它,重新建立连接,从而恢复操作。
根据我目前所做的测试,我相信 committableSource 已经实现了一些逻辑,因为如果我在没有连接到集群的情况下启动应用程序,并且在几分钟后启动集群,那么在启动集群后不久,应用程序就会根据需要使用消息。我觉得很奇怪,因为有一个 RestartSource.onFailuresWithBackoff 所以我的第一个问题是:这种行为有保证吗?我已经仔细阅读了文档,但是我还没有找到任何关于这些条件下的预期的参考资料。
然而,我正在努力解决的是,我无法捕获任何类型的异常或日志消息,表明这些连接正在重试或保持。
这是我正在运行的代码的简化版本

var actorSystem = ActorSystem.create(Behaviors.empty(), "X");
var classicActorSystem = toClassic(actorSystem);

ConsumerSettings<String, String> kafkaConsumerSettings =
ConsumerSettings.create(classicActorSystem, new StringDeserializer(), new StringDeserializer())
    .withBootstrapServers(config.getSourceBootstrapServers())
    .withGroupId(config.getGroupId())
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
    .withStopTimeout(Duration.ofSeconds(5))
    .withMetadataRequestTimeout(Duration.ofSeconds(5))
    .withPollTimeout(Duration.ofSeconds(5));

var control = Consumer.committableSource(kafkaConsumerSettings, Subscriptions.topics(config.getSourceTopics()))
        .map(this::mapMessages)
        .runWith(Sink.ignore(),actorSystem);

control.thenAccept(s -> System.out.println("Consuming finished"));

我已经在application.conf中全局激活了日志

akka {
  loglevel = "DEBUG"
  stdout-loglevel = "DEBUG"
  log-config-on-start = on
  stream {
    materializer {
      debug-logging = on
    }
  }
}

试图修改源代码,用

Function<Throwable, Supervision.Directive> decider =
    exc -> {
        System.out.println("Decider Log");
        return (Supervision.Directive) Supervision.resume();
    };

//----------

.withAttributes(ActorAttributes.withSupervisionStrategy(decider))

我觉得很奇怪,这个框架没有内置这个日志,但是尽管我做了这些和其他的尝试,我还是无法访问它,有人知道怎么做吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题