我在带有kafka net插件的red hat vm上使用kafka 0.8.1.1。如何配置我的消费者停止接收来自kafka的早期消息?
我的消费者代码:
var options = new KafkaOptions(new Uri("tcp://199.53.249.150:9092"), new Uri("tcp://199.53.249.151:9092"));
Stopwatch sp = new Stopwatch();
var router = new BrokerRouter(options);
var consumer = new Consumer(new ConsumerOptions("Test", router));
ThreadStart start2 = () =>
{
while (true)
{
sp.Start();
foreach (var message in consumer.Consume())
{
if (MessageDecoderReceiver.MessageBase(message.Value) != null)
{
PrintMessage(MessageDecoderReceiver.MessageBase(message.Value).ToString());
}
else
{
Console.WriteLine(message.Value);
}
}
sp.Stop();
}
};
var thread2 = new Thread(start2);
thread2.Start();
1条答案
按热度按时间wsxa1bj11#
kafka net中的使用者当前不会自动跟踪正在使用的偏移量。您必须手动执行偏移跟踪。
要在kafka版本0.8.1中存储偏移量,请执行以下操作:
要将使用者设置为在特定偏移点开始导入,请执行以下操作:
注意上面的代码将消费者设置为从日志的最后开始消费,实际上只接收新消息。