java Kafka生成器中的回调方法与获取方法

inkz8wg9  于 2023-02-11  发布在  Java
关注(0)|答案(2)|浏览(328)

要获取生成的记录详细信息,我们有两个选项可供选择

  1. onCompletion()-回调函数
  2. get()方法
    有人能解释一下它们之间的区别是什么,以及如何详细地使用它们吗?(JAVA)

注意:我使用的生产者属性大多是默认的(例如:批处理大小、acks、最大块大小...)

esyap4oy

esyap4oy1#

onCompletion是产生Kafka数据的异步方式,并且具有get的循环将是以Kafka写入数据的同步方式。
Kafka中的生产者以非常高的吞吐量在主题上写入数据。如果在生产者代码中使用sync get函数,则在每次写入之后,生产者需要等待来自Kafka的ack。这限制了生产者吞吐量。生产者需要等待Kafka存储数据。复制(基于其配置方式),然后在成功写入时将ack返回给生产者。
另一个选择是onCompletion,这里生产者将继续生产数据,而不等待Kafka的ack。如果写入成功,Kafka将调用回调onCompletion。生产者需要跟踪这些onCompletion调用,如果失败,它需要重试。
生产者通常会向Kafka发送一批N条记录,然后等待所有完成事件,再发送下一批N条记录,这有点像TCP滑动窗口流控制范例。
很难建议您应该做什么。使用onCompletion并从那里使用重试的缺点是会危及Kafka中记录的顺序。
生产者可能已经成功地发送了1..65条记录,然后Kafka错过了65-72条,然后Kafka写了73..99条。一旦Kafka完成了写99条,生产者可能会得到66、67条作为onCompletion(因为它是异步回调,它可以随时到来)回调并重试。这本质上使记录排序混乱。
在这些情况下,使用者需要了解,可能不会对所有写入进行排序。
我的建议是对一批记录使用onCompletion,一般来说,应用程序没有非常严格的排序要求,因此可以利用调用的异步特性来提高吞吐量。

aiazj4mn

aiazj4mn2#

onCompletion()是Java Kafka客户端中定义的异步回调方法。
另一方面,get()是一个内置的Java函数,当您使用Java Kafka客户端时,可以将get()future一起使用以实现同步写入,如下面的Confluent文档中的示例所示:

Future<RecordMetadata> future = producer.send(record);
RecordMetadata metadata = future.get();

相关问题