Kafka消费者commitsync与commitsync之比较

guykilcj  于 2021-06-07  发布在  Kafka
关注(0)|答案(4)|浏览(1754)

引用自https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_co2-1
缺点是,虽然commitsync()将重试提交,直到提交成功或遇到不可重试的失败,但commitsync()不会重试。
这个短语我不清楚。我假设使用者向代理发送提交请求,如果代理在某个超时时间内没有响应,则表示提交失败。我错了吗?
你能解释一下这两个词的区别吗 commitSync 以及 commitAsync 详细情况?
另外,请提供我应该选择哪种提交类型的用例。

avwztpqn

avwztpqn1#

commitsync和commitsync都使用了kafka偏移管理特性,两者都有缺点。如果消息处理成功并且提交偏移失败(不是原子的),并且同时发生分区重新平衡,那么您处理的消息将被其他使用者再次处理(重复处理)。如果您对重复消息处理没有问题,那么您可以使用commitasync(因为它不阻塞并且提供低延迟,并且提供更高的提交顺序)。所以你应该没事)。否则,在处理和更新偏移量时,使用一个自定义偏移量管理来处理原子性(使用外部偏移量存储)

zfycwa2u

zfycwa2u2#

正如api文档中所说:
commitsync公司
这是一个同步提交,将被阻止,直到提交成功或遇到不可恢复的错误(在这种情况下,它将被抛出给调用方)。
也就是说 commitSync 是一种阻塞方法。调用它将阻塞线程,直到它成功或失败。
例如,

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitSync();
    }
}

对于for循环中的每个迭代,仅在 consumer.commitSync() 成功返回或中断时抛出异常,您的代码将移动到下一个迭代。
commitasync公司
这是一个异步调用,不会阻塞。遇到的任何错误要么传递给回调(如果提供),要么丢弃。
也就是说 commitAsync 是一种非阻塞方法。调用它不会阻塞线程。相反,它将继续处理以下指令,不管它最终是成功还是失败。
例如,与前面的示例类似,但这里我们使用 commitAsync :

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
        consumer.commitAsync(callback);
    }
}

对于for循环中的每个迭代,无论发生什么 consumer.commitAsync() 最终,您的代码将移动到下一个迭代。而且,提交的结果将由您定义的回调函数处理。
权衡:延迟与数据一致性
如果必须确保数据一致性,请选择 commitSync() 因为它将确保在执行任何进一步的操作之前,您将知道偏移提交是成功的还是失败的。但是由于它是同步和阻塞的,您将花费更多的时间等待提交完成,这将导致高延迟。
如果您确定某些数据不一致并且希望具有低延迟,请选择 commitAsync() 因为它不会等待完成。相反,它只是发送提交请求,稍后处理来自kafka(成功或失败)的响应,同时,您的代码将继续执行。
一般来说,实际行为将取决于实际代码和调用方法的位置。

kxxlusnw

kxxlusnw3#

使用commitasync()进行健壮的重试处理

在“kafka-最终指南”一书中,有一个关于如何减轻由于异步提交而提交较低偏移量的潜在问题的提示:
重试异步提交:为异步重试获得正确的提交顺序的一个简单模式是使用单调递增的序列号。每次提交时增加序列号,并将提交时的序列号添加到commitasync回调。准备发送重试时,检查回调得到的提交序列号是否等于示例变量;如果是,则没有新的提交,可以安全地重试。如果示例序列号较高,则不要重试,因为已发送较新的提交。
以下代码描述了一种可能的解决方案:

import java.util._
import java.time.Duration
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata, OffsetCommitCallback}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import collection.JavaConverters._

object AsyncCommitWithCallback extends App {

  // define topic
  val topic = "myOutputTopic"

  // set properties
  val props = new Properties()
  props.put(ConsumerConfig.GROUP_ID_CONFIG, "AsyncCommitter")
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
  // [set more properties...]

  // create KafkaConsumer and subscribe
  val consumer = new KafkaConsumer[String, String](props)
  consumer.subscribe(List(topic).asJavaCollection)

  // initialize global counter
  val atomicLong = new AtomicLong(0)

  // consume message
  try {
    while(true) {
      val records = consumer.poll(Duration.ofMillis(1)).asScala

      if(records.nonEmpty) {
        for (data <- records) {
          // do something with the records
        }
        consumer.commitAsync(new KeepOrderAsyncCommit)
      }

    }
  } catch {
    case ex: KafkaException => ex.printStackTrace()
  } finally {
    consumer.commitSync()
    consumer.close()
  }

  class KeepOrderAsyncCommit extends OffsetCommitCallback {
    // keeping position of this callback instance
    val position = atomicLong.incrementAndGet()

    override def onComplete(offsets: util.Map[TopicPartition, OffsetAndMetadata], exception: Exception): Unit = {
      // retrying only if no other commit incremented the global counter
      if(exception != null){
        if(position == atomicLong.get) {
          consumer.commitAsync(this)
        }
      }
    }
  }

}
nxagd54h

nxagd54h4#

commitAync 不会重试,因为如果重试会造成混乱。
假设您尝试提交偏移量20(异步),但它没有提交(失败),然后下一个轮询块尝试提交偏移量40(异步),它成功了。
现在,commit offset 20仍在等待提交,如果它重定时并成功,将造成混乱。
乱七八糟的是,承诺的补偿应该是40而不是20。

相关问题