在我的spring boot应用程序中,实现kafka健康检查的更干净有效的方法是什么?

7vux5j2d  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(345)

我有一个springboot(2.1.6)应用程序,它使用(组织范围的)公共kafka示例并向其生成消息。我试图在我的应用程序中使用springactuator实现kafka代理的健康检查,我面临着一系列与性能和日志记录相关的问题。SpringBoot2.0内置了一个健康指示器,但由于一些明显的问题,他们将其删除。
下面是我实现的healthcheck类:

@Component
public class KafkaHealthCheck implements HealthIndicator {

  private static final Logger logger = LoggerFactory.getLogger(KafkaHealthCheck.class);

  private KafkaAdmin kafkaAdmin;

  private Map<String, Object> kafkaConfig;

  @Value("${application.topic}")
  private String topicName;

  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;

  public KafkaHealthCheck(KafkaAdmin kafkaAdmin) {
    this.kafkaAdmin = kafkaAdmin;

  }

  @PostConstruct
  public void setUpAdminClient() {
    kafkaConfig = new HashMap<>();
    kafkaConfig.putAll(kafkaAdmin.getConfig());
    kafkaConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
  }

  @Override
  public Health health() {
    Long start = System.currentTimeMillis();
    try (AdminClient adminClient = AdminClient.create(kafkaConfig)) {

      DescribeClusterOptions describeClusterOptions = new DescribeClusterOptions().timeoutMs(2000);
      adminClient.describeCluster(describeClusterOptions);

      adminClient.describeConsumerGroups(List.of("topic")).all()
          .get(2, TimeUnit.SECONDS);

      Map<String, TopicDescription> topicDescriptionMap = adminClient
          .describeTopics(List.of(topicName)).all().get(2, TimeUnit.SECONDS);

      List<TopicPartitionInfo> partitions = topicDescriptionMap.get(topicName)
          .partitions();

      if (partitions == null || partitions.isEmpty()) {
        logger.warn(String
            .format("Kafka healthcheck failed - No partition found for topic: %s", topicName));
        return Health.down()
            .withDetail("Kafka healthcheck failed", "No partition found for topic: " + topicName)
            .build();
      } else {
        if (partitions.stream().anyMatch(p -> p.leader() == null)) {
          logger.warn(
              String.format("Kafka healthcheck failed - No partition leader found for topic: %s",
                  topicName));
          return Health.down().withDetail("Kafka healthcheck failed",
              "No partition leader found for topic: " + topicName).build();
        }
      }
    } catch (Exception e) {
      logger.warn("Kafka healthcheck failed", e);
      return Health.down()
          .withDetail("Kafka healthcheck failed", "Exception occurred during healthcheck").build();
    }
    System.out.println(System.currentTimeMillis() - start);
    return Health.up().build();
  }
}

以下是我在实施过程中遇到的问题:
1-kafkaadmin在这个类中被注入了除“bootstrap.servers”之外的所有配置(我使用的是ssl)。我知道了 org.springframework.boot.autoconfigure.kafka.KafkaPropertieslocalhost:9092 作为默认值,应用程序配置不会以某种方式覆盖它,同时它对使用者和生产者都可以正常工作。我不知道为什么,因此我必须在这里手动设置。
2-我向添加了超时 DescribeClusterOptions 以及 describeConsumerGroups 但这些超时似乎被完全忽略了。如果我手动关闭代理,healthcheck大约需要几分钟来报告错误。
3-由于bootstrap.servers错误,当我实际部署应用程序时,它几乎杀死了我的日志服务器,它生成了数百万条日志行 org.apache.kafka.clients.NetworkClientConnection to node -1 could not be established. Broker may not be available. . 我怎样才能阻止它再次发生?即使在经纪公司在经营过程中倒闭的情况下。
4-创建adminclient时,即使是成功的运行状况检查也会生成大量日志行。它会注销它读取的所有配置和一堆其他语句。有没有可能把它最小化?
总的来说,这是非常缓慢的。我试着计算只运行这个健康检查所需的时间,平均约为1.5秒。有机会优化吗?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题