合流kafka dot net produceasync任务

nqwrtyyt  于 2021-06-08  发布在  Kafka
关注(0)|答案(2)|浏览(382)
var msgs = new List<string> {“msg1”, “msg2”, “msg3”};

var tasks = new List<Task>();

Foreach(var msg in msgs) {
         tasks.add(_producer.ProduceAsync(...)); }

var deliveryReports = Task.WhenAll(tasks).Result;

我的Kafka制作人配置:
批量:10
linger:100 ms
我的问题是,任务是否按创建顺序完成。我能保证代表msg1的任务在代表msg2或msg3的任务之前完成吗。
谢谢。

xfyts7mz

xfyts7mz1#

好吧,我想我现在明白了制作人和经纪人是如何实现订购的。
因此,当调用produceasync时,它将消息添加到发送缓冲区,创建用于完成future并返回future的promise。因此,它创建任务完成源对象并返回其任务。
客户机库(librdkafka)等待,直到它接收到配置的消息数或超时时间,才能对消息进行批处理。将创建一个批处理,其中包含与发送缓冲区中相同顺序的消息。根据目标分区/主题对批进行分区(如果使用默认分区器,则随机进行),即拆分为更小的批。每个拆分后的批处理都被发送到各自的leader broker/isr(单个send()按顺序发生),并且每个批处理都由各自的leader broker根据request.required.acks进行确认。客户机库在接收到的每个ack上调用回调,回调完成其各自的未来,即taskcompletionsource.set();

u5rb5r59

u5rb5r592#

这里有几件事。
首先,librdkafka具有为您管理重试的功能,并且默认情况下是这样的(“retries”设置为2),因此这可能会导致对消息传递和传递报告进行重新排序。为确保不会发生这种情况,您可以将“max.in.flight”设置为1(或将“retries”设置为0并自行管理)。
将librdkafka配置为按照消息发送的顺序向.net提供传递报告,问题就变成了任务完成排序的保证之一。我需要思考5分钟以上才能给出一个好的答案,但现在假设订购是不能保证的(我稍后会写更多)。通过使用接受ideliveryreport处理程序的produceasync变体,可以获得有保证的排序。请注意,在版本1.0中,这些方法将有所更改,并将被称为beginproduce。

相关问题