kafka异步提交请求失败

z31licg0  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(2582)

我观察到,Kafka消费滞后在运行数小时/天之后突然开始增加。
在查看日志时,我看到了很多异常:
org.apache.kafka.clients.consumer.retriablecommitfailedexception:偏移提交失败,出现可检索异常。您应该重试提交最新消耗的偏移量。
我的consumerthread课程:

public class ConsumerThread implements Runnable {
  private final KafkaConsumer<String, Map<String, Object>> consumer;
  public ConsumerThread(
    this.consumer = new KafkaConsumer<>(getConsumerConfig(kafkaConfiguration));
  }

  @Override
  public void run() {
    try {
      consumer.subscribe(topicList);

      while (true) {
        ConsumerRecords<String, Map<String, Object>> records =
            consumer.poll(Duration.ofMillis(kafkaConfiguration.getPollIntervalMs()));

        long startPerPoll = System.nanoTime();
        for (final ConsumerRecord<String, Map<String, Object>> record : records) {
            // message processing logic here
        }

        consumer.commitAsync((offsets, exception) -> {
            if (exception != null) {
              //log.error(exception.getMessage());
              log.info("exception while committing offset, consumerThread: {}, exception: {}", Thread.currentThread().getName(), exception);
              exception.printStackTrace();
            }
        });

      }
    } catch (Exception e) {
      // ignore for shutdown
      log.info("exception in run for consumerThread: {}", e);
    } finally {
      try {
        if (Objects.nonNull(consumer)) {
          consumer.commitSync();
        }
      } finally {
        if (Objects.nonNull(consumer)) {
          consumer.close();
        }
      }
    }
}

我的Kafka配置:

groupId: cep-cg
autoCommitEnabled: false
sessionTimeoutMs: 30000
heartBeatIntervalMs: 10000
autoOffsetReset: latest
maxPollRecord: 250
maxPollIntervalMs: 180000
requestTimeoutMs: 240000
pollIntervalMs: 3000

我检查了stackoverflow上的其他答案,并做了一些调整,但似乎都不起作用。
我想知道的是:
有什么线索可以解释为什么延迟会突然增加?
是否有可能有很多commitasync请求在代理上挂起,并且可能有一段时间(由代理上的某个配置定义)commitasync请求开始失败?
比方说,一个消费者阅读了更多 max.poll.interval.ms 是时候处理消息了。在这种情况下,它将被踢出组并触发重新平衡。现在所有的 commitAsync 代理上挂起的请求失败 CommitFailedException 因为分区现在属于组中的其他使用者。在上面的代码中,使用者将跳出无限循环,并将永远关闭。这样对吗?还是我该抓住 CommitFailedException 再次恢复循环以保持消费者的活力?

uwopmtnx

uwopmtnx1#

假设我们发送了一个提交偏移量2000的请求。有一个临时通信问题,所以代理永远不会得到请求,因此永远不会响应。同时,我们处理了另一个批并成功提交了偏移量3000。现在重试以前提交失败的批,在异常中,它将显示相同的消息。在重新平衡的情况下,这将导致更多的重复
答。滞后时间在增加
由于消费者不是不断地消费唱片,而是制作者不断地制作唱片,因此再平衡的发生越来越频繁。
b。commitasync请求超时
只允许组中的活动成员提交偏移量。如果使用者在尝试提交偏移量时被踢出组,它将抛出commitfailedexception
c、 再平衡
当重新平衡开始时,使用者必须在会话超时过期之前完成它当前正在执行的任何处理、提交偏移量并重新加入组。
我们应该提示ConsumerBalanceListener并使用onpartitionsrevoked()提交偏移量,然后再失去分区的所有权来提交当前偏移量。
max.poll.interval.ms和max.poll.records设置为相当低的值,同时保持session.timeout.ms较低,这样就不需要牺牲故障检测时间。
从commitsync()引发的commitfailedexception。这保证了只允许组中的活动成员提交偏移量。如果使用者已被踢出组,则其分区将被分配给另一个成员,该成员将提交自己的偏移量。

相关问题