这个脚本是从kafka订阅事件的方法。
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
static void Main(string[] args)
{
string brokerList = "broker";
var topics = new List<string>() { "topicName" };
var config = new Dictionary<string, object>
{
{ "group.id", "ConsumerGroup" },
{ "bootstrap.servers", brokerList },
{ "auto.offset.reset", "earliest" },
{ "enable.auto.commit", false }
};
using (var consumer = new Consumer<string, string>(config, new StringDeserializer(Encoding.UTF8), new StringDeserializer(Encoding.UTF8)))
{
consumer.OnMessage += (obj, msg) =>
{
...
};
consumer.Subscribe(topics);
while (true)
{
consumer.Poll(TimeSpan.FromMilliseconds(1000));
}
}
}
在调试模式下跟踪代码时,订阅事件的顺序是: consumer.Subscribe(topics)
consumer.Poll(TimeSpan.FromMilliseconds(1000)); consumer.OnMessage += (obj, msg) =>
获取新事件之前(转到 consumer.OnMessage
),它花了一点时间来投票(在 consumer.Poll
)并在控制台窗口打印一些信息。
具体如下:
4|2018-12-12 10:41:53.381|rdkafka#consumer-1|REQTMOUT| [thrd:broker/bootstrap]: broker/bootstrap: Timed out 1 in-flight, 0 retry-queued, 0 out-queue, 0 partially-sent requests
在我最初的想法中,它使用 consumer.Subscribe(topics)
连接代理并使用 consumer.Poll
使用新事件。
但看起来 consumer.Poll
包括连接到代理和使用新事件。
我的问题是:
哪个函数可以连接到代理? consumer.Subscribe
或者 consumer.Poll
或者?
为什么? consumer.Poll
在控制台窗口打印信息?似乎有一些错误(飞行中超时1)。
1条答案
按热度按时间hivapdat1#
关于你的第一个问题。
哪个函数可以连接到代理?consumer.subscribe或consumer.poll或?
consumer.Subscribe
连接到代理并consumer.Poll
使用消息。关于第二个。
为什么consumer.poll在控制台窗口打印信息?似乎有一些错误(飞行中超时1)。
Kafka在这里发布了一个新的主要版本。如果您对以前的版本有任何问题,可以使用推荐的版本。基于github存储库:
它比0.11.x版本有更多的特性,有很大的改进,而且性能更高