我已经写了一个简单的生产者发送按钮点击一个消息到Kafka。
class Producer : IProducer
{
public void produce(string msg, string topic)
{
var config = new Dictionary<string, object>
{
{"bootstrap.servers", "localhost:9092" }
};
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var m = producer.ProduceAsync(topic, null, msg).Result;
}
}
}
我的问题是-如何编写一个消费者,将提出对按钮点击和接收最新发布的消息从Kafka,并保存到一个txt文件在本地磁盘上的某处?这就是我现在写的:
public class Consumer
{
public void Consume(string topic)
{
var config = new Dictionary<string, object>
{
{"group.id", "consumer-latest-msg" },
{"bootstrap.servers", "localhost:9092" },
{"auto.commit.interval.ms", 5000 },
{"auto.offset.reset", "latest" }
};
using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
//what to do?
}
}
}
1条答案
按热度按时间yeotifhr1#
注意:在我开始写答案之前,我只是一个免责声明:我不熟悉c#,所以我将尝试从概念上回答这个问题。
我认为你需要在你的消费空间里做的就是:
通过调用
subscribe()
消费者方法调用
poll()
方法-->这将返回新写入的记录在循环中处理所有记录,即根据需要将每条消息(键和/或值)打包到新文件中。
我在java中很快尝试了这一点,下面是代码片段(包含上述3个步骤),供您参考:
在运行这个客户机并从控制台生产者生成两条消息时,在指定的目录(data)中创建了两个文件。
此外,无限循环的原因是为了确保消费者保持运行,因为如果消费者没有定期进行投票,那么消费者组协调器会假设消费者已经死亡,并且消费者组内会触发重新平衡。
我希望这有帮助!