这里汇合的高级使用者有以下代码(为简洁起见进行了裁剪)。
using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
while (!cancelled)
{
Message<Null, string> msg;
if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
{
continue;
}
if (msg.Offset % 5 == 0)
{
consumer.CommitAsync(msg).Result;
}
}
}
自动提交为false。我的问题是,如果在仍有未完成的提交时标记“cancelled”触发器,会发生什么情况。消息是否未提交,因此将再次收到?我希望消费者会承诺进行处理,但我在实现中看不到类似的内容。我可以做一些测试,看看会发生什么,但我希望有一个'官方'的答案,以防我的测试不涵盖所有的情况。
1条答案
按热度按时间wlsrxk511#
首先,你应该使用
CommitAsync
只有当enable.auto.commit
设置为false
(这是高级消费者示例中的情况—这需要更多的注解和wiki输入)。如果您使用的是自动提交(默认情况下),则在处理时会自动提交(调用rd\u kafka\u destroy时在librdkafka端)在这里,我们在等待
CommitAsync
完成(通过.Result
)它与while循环在同一个线程上调用,因此不会出现“正在运行”的提交reuest。如果您使用手动提交(在完整的代码段中就是这种情况),则必须在处理之前手动提交—这个示例中确实缺少了它,您将添加它。如上所述,在手动提交中,您必须提交自己(在某些情况下,用户在处理时不希望提交)
我还发现了一个bug,如果你调用commitasync,但是在处理之前不等待它,你可能会有一个线程被卡住或者accessviolationexception,你会试图纠正这个错误(https://github.com/confluentinc/confluent-kafka-dotnet/issues/279)因此,您应该始终等待commitasync完成,然后再暂时退出