要获取生成的记录详细信息,我们有两个选项可供选择
注意:我使用的生产者属性大多是默认的(例如:批处理大小、acks、最大块大小...)
esyap4oy1#
onCompletion是产生Kafka数据的异步方式,并且具有get的循环将是以Kafka写入数据的同步方式。Kafka中的生产者以非常高的吞吐量在主题上写入数据。如果在生产者代码中使用sync get函数,则在每次写入之后,生产者需要等待来自Kafka的ack。这限制了生产者吞吐量。生产者需要等待Kafka存储数据。复制(基于其配置方式),然后在成功写入时将ack返回给生产者。另一个选择是onCompletion,这里生产者将继续生产数据,而不等待Kafka的ack。如果写入成功,Kafka将调用回调onCompletion。生产者需要跟踪这些onCompletion调用,如果失败,它需要重试。生产者通常会向Kafka发送一批N条记录,然后等待所有完成事件,再发送下一批N条记录,这有点像TCP滑动窗口流控制范例。很难建议您应该做什么。使用onCompletion并从那里使用重试的缺点是会危及Kafka中记录的顺序。生产者可能已经成功地发送了1..65条记录,然后Kafka错过了65-72条,然后Kafka写了73..99条。一旦Kafka完成了写99条,生产者可能会得到66、67条作为onCompletion(因为它是异步回调,它可以随时到来)回调并重试。这本质上使记录排序混乱。在这些情况下,使用者需要了解,可能不会对所有写入进行排序。我的建议是对一批记录使用onCompletion,一般来说,应用程序没有非常严格的排序要求,因此可以利用调用的异步特性来提高吞吐量。
onCompletion
get
ack
aiazj4mn2#
onCompletion()是Java Kafka客户端中定义的异步回调方法。另一方面,get()是一个内置的Java函数,当您使用Java Kafka客户端时,可以将get()与future一起使用以实现同步写入,如下面的Confluent文档中的示例所示:
onCompletion()
get()
future
Future<RecordMetadata> future = producer.send(record); RecordMetadata metadata = future.get();
2条答案
按热度按时间esyap4oy1#
onCompletion
是产生Kafka数据的异步方式,并且具有get
的循环将是以Kafka写入数据的同步方式。Kafka中的生产者以非常高的吞吐量在主题上写入数据。如果在生产者代码中使用sync
get
函数,则在每次写入之后,生产者需要等待来自Kafka的ack
。这限制了生产者吞吐量。生产者需要等待Kafka存储数据。复制(基于其配置方式),然后在成功写入时将ack返回给生产者。另一个选择是
onCompletion
,这里生产者将继续生产数据,而不等待Kafka的ack
。如果写入成功,Kafka将调用回调onCompletion
。生产者需要跟踪这些onCompletion
调用,如果失败,它需要重试。生产者通常会向Kafka发送一批N条记录,然后等待所有完成事件,再发送下一批N条记录,这有点像TCP滑动窗口流控制范例。
很难建议您应该做什么。使用
onCompletion
并从那里使用重试的缺点是会危及Kafka中记录的顺序。生产者可能已经成功地发送了1..65条记录,然后Kafka错过了65-72条,然后Kafka写了73..99条。一旦Kafka完成了写99条,生产者可能会得到66、67条作为
onCompletion
(因为它是异步回调,它可以随时到来)回调并重试。这本质上使记录排序混乱。在这些情况下,使用者需要了解,可能不会对所有写入进行排序。
我的建议是对一批记录使用
onCompletion
,一般来说,应用程序没有非常严格的排序要求,因此可以利用调用的异步特性来提高吞吐量。aiazj4mn2#
onCompletion()
是Java Kafka客户端中定义的异步回调方法。另一方面,
get()
是一个内置的Java函数,当您使用Java Kafka客户端时,可以将get()
与future
一起使用以实现同步写入,如下面的Confluent文档中的示例所示: