如何使用kafka中发送的最新文本文件数据,并使用c#写回另一个文本文件?

ego6inou  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(323)

我已经发送了一个文本文件的数据后,Kafka生产者读取该文件的字符串。现在我想在文本文件中使用相同的数据。我怎么消费它?

var fileName = @"D:\kafka_examples\new2.txt";
var options = new KafkaOptions(new Uri("http://localhost:9092"),
              new Uri("http://localhost:9092"));
var router = new BrokerRouter(options);
var consumer = new KafkaNet.Consumer(new ConsumerOptions("Hello-Kafka",
               new BrokerRouter(options)));
var text="";
//Consume returns a blocking IEnumerable (ie: never ending stream)
if (File.Exists(fileName))
{
  File.Delete(fileName);
}

foreach (var message in consumer.Consume())
{
  Console.WriteLine("Response: P{0},O{1} : {2}",
                   message.Meta.PartitionId, message.Meta.Offset,
                  text= Encoding.UTF8.GetString(message.Value));
                using (StreamWriter sw = File.CreateText(fileName))
  {
    sw.WriteLine(text);
  }
}

我试过这个,但文件不是用给定的文本文件写的。所有消息都来了。我只想要最后一条信息。

o75abkj4

o75abkj41#

流中没有“最新”消息的概念;它们是无限的。
但您可以做的是在代码开始时查找当前最新的偏移量,然后减去一个(或文件中的行数),然后 seek 那里的消费者群体 break 这个 for 循环阅读那么多信息。
即。

var filename = ...
var lines = linesInFile(filename)
var consumer = ... // (with a consumer group id)
var latestOffset = seekToEnd(consumer, -1 * lines) // second param is the delta offset from the end

var i = lines;
foreach (var message in consumer.Consume()) {
   ... 

   if (i <= 0) break;
}

另外,kafka不是http服务。删除 http:// 以及代码中重复的localhost地址

相关问题