目前我正在使用合流kafkanuget包,但不使用合流平台本身(https://github.com/confluentinc/confluent-kafka-dotnet).
根据应用程序的需要,需要创建一个使用者,获取第一条消息,然后关闭该使用者。整个循环应尽可能快(最多1秒)。我遇到的问题-读取第一条消息需要很多poll()周期(数据已经生成并存在于主题中):
使用subscribe()方法时大约8秒(因为重新平衡),使用assign()时大约5秒(没有重新平衡)
我在消费者方面尝试了很多配置的东西(因为生产者没有参与其中),但是没有任何帮助。是预期行为吗?也许需要在代理端配置一些东西?
您可以在下面找到我的简单消费者的代码:
class Program
{
private static Dictionary<string, object> _config =>
new Dictionary<string, object>
{
{ "group.id", "test-consumer" },
{ "enable.auto.commit", false },
{ "bootstrap.servers", "192.168.56.102:9092" },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};
static void Main(string[] args)
{
Stopwatch sw = new Stopwatch();
sw.Start();
var consumer = new Consumer<Ignore, string>(_config, null, new StringDeserializer(Encoding.UTF8));
consumer.Assign(new List<TopicPartition> {new TopicPartition("TestQueue", 0)});
Message<Ignore, string> msg = null;
bool cancel = false;
consumer.OnMessage += (sender, message) =>
{
msg = message;
cancel = true;
};
while (!cancel)
{
consumer.Poll(100);
}
consumer.CommitAsync(msg);
consumer.Dispose();
sw.Stop();
}
}
暂无答案!
目前还没有任何答案,快来回答吧!