我已经发送了一个文本文件的数据后,Kafka生产者读取该文件的字符串。现在我想在文本文件中使用相同的数据。我怎么消费它?
var fileName = @"D:\kafka_examples\new2.txt";
var options = new KafkaOptions(new Uri("http://localhost:9092"),
new Uri("http://localhost:9092"));
var router = new BrokerRouter(options);
var consumer = new KafkaNet.Consumer(new ConsumerOptions("Hello-Kafka",
new BrokerRouter(options)));
var text="";
//Consume returns a blocking IEnumerable (ie: never ending stream)
if (File.Exists(fileName))
{
File.Delete(fileName);
}
foreach (var message in consumer.Consume())
{
Console.WriteLine("Response: P{0},O{1} : {2}",
message.Meta.PartitionId, message.Meta.Offset,
text= Encoding.UTF8.GetString(message.Value));
using (StreamWriter sw = File.CreateText(fileName))
{
sw.WriteLine(text);
}
}
我试过这个,但文件不是用给定的文本文件写的。所有消息都来了。我只想要最后一条信息。
1条答案
按热度按时间o75abkj41#
流中没有“最新”消息的概念;它们是无限的。
但您可以做的是在代码开始时查找当前最新的偏移量,然后减去一个(或文件中的行数),然后
seek
那里的消费者群体break
这个for
循环阅读那么多信息。即。
另外,kafka不是http服务。删除
http://
以及代码中重复的localhost地址