confluent.net(rdkafka)在使用者处理上提交

tgabmvqs  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(374)

这里汇合的高级使用者有以下代码(为简洁起见进行了裁剪)。

using (var consumer = new Consumer<Null, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
        {
            while (!cancelled)
            {
                Message<Null, string> msg;
                if (!consumer.Consume(out msg, TimeSpan.FromMilliseconds(100)))
                {
                    continue;
                }                    

                if (msg.Offset % 5 == 0)
                {                        
                    consumer.CommitAsync(msg).Result;                        
                }
            }
        }

自动提交为false。我的问题是,如果在仍有未完成的提交时标记“cancelled”触发器,会发生什么情况。消息是否未提交,因此将再次收到?我希望消费者会承诺进行处理,但我在实现中看不到类似的内容。我可以做一些测试,看看会发生什么,但我希望有一个'官方'的答案,以防我的测试不涵盖所有的情况。

wlsrxk51

wlsrxk511#

首先,你应该使用 CommitAsync 只有当 enable.auto.commit 设置为 false (这是高级消费者示例中的情况—这需要更多的注解和wiki输入)。如果您使用的是自动提交(默认情况下),则在处理时会自动提交(调用rd\u kafka\u destroy时在librdkafka端)
在这里,我们在等待 CommitAsync 完成(通过 .Result )它与while循环在同一个线程上调用,因此不会出现“正在运行”的提交reuest。
如果您使用手动提交(在完整的代码段中就是这种情况),则必须在处理之前手动提交—这个示例中确实缺少了它,您将添加它。如上所述,在手动提交中,您必须提交自己(在某些情况下,用户在处理时不希望提交)
我还发现了一个bug,如果你调用commitasync,但是在处理之前不等待它,你可能会有一个线程被卡住或者accessviolationexception,你会试图纠正这个错误(https://github.com/confluentinc/confluent-kafka-dotnet/issues/279)因此,您应该始终等待commitasync完成,然后再暂时退出

相关问题