kafkaproducer:callback和returned之间的区别?

szqfcxe2  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(358)

kafkaproducersend方法既返回future,又接受回调。
在发送完成后,使用一种机制执行操作与使用另一种机制执行操作有什么根本区别吗?

k4ymrczo

k4ymrczo1#

查看您链接到它的文档,未来和回调之间的主要区别在于谁发起了“请求完成了,现在做什么?”问题。
假设我们有一个顾客 C 还有一个面包师 B . 以及 C 他在问 B 给他做个好饼干。现在有两种可能的方式面包师可以返回美味的饼干给客户。

未来

面包师接受了这个要求,并告诉顾客:好的,等我吃完后,我会把你的饼干放在柜台上(本协议为 Future .)
在这种情况下,客户负责检查柜台( Future )看看面包师是否吃完了他的饼干。
阻止顾客停留在柜台附近看它,直到饼干放在那里(future.get()),或者面包师在那里道歉(错误:饼干面团用完)。
非阻塞客户执行一些其他工作,偶尔检查一下cookie是否正在柜台上等待他(future.isdone())。如果cookie准备好了,客户就会接受它(future.get())。

回拨

在这种情况下,顾客在点了他的饼干后,告诉面包师:当我的饼干准备好了,请把它给我的宠物机器狗,他会知道该怎么处理它(这个机器人是回叫)。
现在面包师在饼干准备好后把饼干递给狗,并告诉它跑回主人那里。面包师可以继续为另一位顾客烤下一块饼干。
狗跑回顾客身边,开始摇它的假尾巴,让顾客知道他的饼干已经做好了。
注意,顾客不知道饼干什么时候会给他,也不主动询问面包师是否准备好了。
这是两种情况的主要区别。谁负责发起“你的饼干准备好了,你想用它做什么?”的问题。对于未来,客户负责检查何时准备就绪,或者主动等待,或者时不时地轮询。在回调的情况下,baker将回调所提供的函数。
我希望这个答案能让你更好地了解未来和卡尔巴克到底是什么。一旦你有了大致的想法,你就可以试着找出每个具体的事情是在哪个线程上处理的。当一个线程被阻塞时,或者按照什么顺序完成所有事情。编写一些简单的程序来打印语句,比如:“main client thread:cookiereceived”,这可能是一个有趣的实验方法。

8fsztsew

8fsztsew2#

我的观察基于Kafka制作人文档: Future 允许您访问同步处理 Future 可能无法保证确认。我的理解是 Callback 确认后执行 Callback 允许您访问完全无阻塞的异步处理。
对于同一分区上回调的执行顺序也有保证
对发送到同一分区的记录的回调保证按顺序执行。
我的另一个观点是 Future 返回对象和 Callback “模式”代表两种不同的编程风格,我认为这是根本区别:
这个 Future 表示java的并发模型样式。
这个 Callback 表示java的lambda编程风格(因为回调实际上满足函数接口的要求)
您可能最终会用这两种方法编写类似的行为 Future 以及 Callback 但是在某些用例中,一种样式可能比另一种更有利。

p1tboqfb

p1tboqfb3#

send()是开始在kafka集群上发布消息的方法。send()方法是一个异步调用,表示send方法在缓冲区中累积消息并立即返回。这可以与linger.ms一起用于批量发布消息以获得更好的性能。我们可以使用call send方法处理异常和控制,并在将来使用get方法进行同步处理,或者使用回调进行异步处理。
每种方法都有自己的优缺点,可以根据用例来决定。
异步发送(fire&forget):我们如下调用send方法来调用publish消息,而不等待任何成功或错误响应。

producer.send(new ProducerRecord<String, String>("topic-name", "key", "value"));

此方案不会等待完成第一条消息,而是开始发送其他消息以发布。如果出现异常,producer将根据retry config参数重试,但如果重试后消息仍然失败,kafka producer将永远不知道这一点。在这种情况下,我们可能会发送一些消息,但如果消息丢失很少,那么这将提供高吞吐量和高延迟。
同步发送同步发送消息的一种简单方法是使用get()方法

RecordMetadata recMetadata = producer.send(new ProducerRecord<String, String>("topic-name", "key", "value")).get();

producer.send返回recordmetadata的未来,当我们调用.get()方法时,它将从kafka得到一个回复。我们可以在出错时捕获错误,或者在成功时返回recordmetadata。recordmetadata包含偏移量、分区、时间戳来记录信息。它的速度很慢,但提供了很高的可靠性和保证传递信息。
异步send with callback我们还可以使用回调函数调用send()方法,该函数在消息完成后返回响应。如果您喜欢以异步方式发送消息,这是很好的,这意味着不要等待完成作业,而是同时处理有关消息传递的错误或更新状态。

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata recodMetadata, Exception ex){...}
})

注意:请不要将ack&retries与asynchronous send call混淆。ack和retries将应用于每个send调用,无论是同步调用还是异步调用,这与您如何处理返回消息和失败场景无关。例如,如果发送asynchronous send still ack并应用重试规则,但将位于独立线程上,而不阻止其他线程发送并行记录。唯一的挑战,我们将不知道的情况下,失败和时间时,它的消息成功完成。

rsl1atfo

rsl1atfo4#

异步方法

producer.send(record, new Callback(){
    @Override
    onComplete(RecordMetadata rm, Exception ex){...}
})

与同步相比,它提供了更好的吞吐量

RecordMetadata rm = producer.send(record).get();

因为在第一种情况下你不需要等待确认。
同样,在异步方式下,不能保证排序,而在同步方式下,只有在收到确认之后才发送消息。
另一个区别可能是,在异常情况下的同步调用中,您可以在异常发生后立即停止发送消息,而在第二种情况下,某些消息将在您发现错误并执行某些操作之前发送。
还请注意,在异步方法中,“在fligh中”的消息数由 max.in.flight.requests.per.connection 参数。
除了同步和异步方法之外,您还可以使用fire-and-forget方法,这与同步方法几乎相同,但不处理返回的元数据—只需发送消息并希望它到达代理(知道很可能会发生这种情况,并且如果出现可恢复的错误,生产者将重试),但有些信息可能会丢失:

RecordMetadata rm = producer.send(record);

总结一下:
火和忘记-最快的一个,但有些信息可能会丢失;
同步-最慢,如果你不能承受消息丢失的话就使用它;
异步-介于两者之间。

qvsjd97n

qvsjd97n5#

主要区别在于是否要阻止等待确认的调用线程。
下面使用future.get()方法将阻止当前线程,直到发送完成,然后再执行某些操作。

producer.send(record).get()
// Do some action

当使用回调执行某些操作时,代码将在i/o线程中执行,因此对调用线程来说是非阻塞的。

producer.send(record,
               new Callback() {
                   // Do some action
                   }
               });

尽管文件上说它“一般”是在生产商那里执行的:
请注意,回调通常会在生产者的i/o线程中执行,因此应该相当快,否则会延迟从其他线程发送消息。如果您想执行阻塞或计算代价高昂的回调,建议在回调主体中使用自己的执行器来并行处理。

相关问题