在Kafka的语境中,异步和要求的概念对我来说很混乱,但我想清楚地理解这些概念。1.如果ask = 1或all,Kafka生产者是否需要等待来自broker的ack响应,并且不能做任何事情?1.如果没有ack响应,Kafka生产者就不能发送下一条消息给代理?如果是这样,看起来就像是同步的,因为生产者正在等待代理的ack结果。你能帮助我理解这些概念吗?谢谢。
rdlzhqv91#
答案就在send()方法中。根据官方文件:public future send(生产者记录记录、回调回调)将记录异步发送到主题,并在发送得到确认后调用提供的回调。发送是异步的,一旦记录存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会阻塞以等待每个记录后的响应。正如您所看到的签名-它接受一个回调方法并返回一个Future对象。在调用这个方法时,方法本身并不关心acks配置。它不会阻塞调用,而是通过返回Future对象和接受回调来将决定权留给调用方法。它会在接收消息时将消息推送到缓冲区,并将其余的留给Future和回调。在缓冲区级别,acks开始得到响应,但这是并行完成的,不会阻塞发送调用程序。acks = 0时,生成器将假定消息在发送时就已写入(也称为"触发并忘记")。如果acks = 1,则只有在引导者收到记录时,生产者才会认为写入成功,如果没有确认,生产者会根据您的配置重试,并相应地使用回调。使用acks = all-〉这只会更改确认部分-即生产者将认为写入成功写入所有副本,因为它将收到基于min.insync.replicas的确认。其余部分为acks = 1。对于您收到的future,您可以稍后检查它并继续发送消息,或者调用get()方法-这将导致阻塞。或者,您可以使用回调在收到确认时执行操作。
Future
acks
min.insync.replicas
get()
如果ask = 1或all,Kafka生产者是否需要等待来自broker的ack响应,并且不能做任何事情?这取决于您是否立即使用Future.get()方法-该方法会阻塞。
producer.send(msg).get() //Waiting...
或者忽略返回并将操作委托给回调
producer.send(record, new Callback() { // Do the same thing } });
1条答案
按热度按时间rdlzhqv91#
答案就在send()方法中。
根据官方文件:
public future send(生产者记录记录、回调回调)
将记录异步发送到主题,并在发送得到确认后调用提供的回调。发送是异步的,一旦记录存储在等待发送的记录缓冲区中,此方法将立即返回。这允许并行发送许多记录,而不会阻塞以等待每个记录后的响应。
正如您所看到的签名-它接受一个回调方法并返回一个
Future
对象。在调用这个方法时,方法本身并不关心
acks
配置。它不会阻塞调用,而是通过返回Future对象和接受回调来将决定权留给调用方法。它会在接收消息时将消息推送到缓冲区,并将其余的留给Future和回调。在缓冲区级别,
acks
开始得到响应,但这是并行完成的,不会阻塞发送调用程序。acks = 0时,生成器将假定消息在发送时就已写入(也称为"触发并忘记")。
如果acks = 1,则只有在引导者收到记录时,生产者才会认为写入成功,如果没有确认,生产者会根据您的配置重试,并相应地使用回调。
使用acks = all-〉这只会更改确认部分-即生产者将认为写入成功写入所有副本,因为它将收到基于
min.insync.replicas
的确认。其余部分为acks = 1。对于您收到的future,您可以稍后检查它并继续发送消息,或者调用
get()
方法-这将导致阻塞。或者,您可以使用回调在收到确认时执行操作。
如果ask = 1或all,Kafka生产者是否需要等待来自broker的ack响应,并且不能做任何事情?
这取决于您是否立即使用Future.get()方法-该方法会阻塞。
或者忽略返回并将操作委托给回调