如何在点击按钮时使用最新消息并将其写入txt文件?

brtdzjyr  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(260)

我已经写了一个简单的生产者发送按钮点击一个消息到Kafka。

class Producer : IProducer
{
    public void produce(string msg, string topic)
    {
        var config = new Dictionary<string, object>
        {
            {"bootstrap.servers", "localhost:9092" }
        };

        using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
        {
             var m = producer.ProduceAsync(topic, null, msg).Result;
        }
    }
}

我的问题是-如何编写一个消费者,将提出对按钮点击和接收最新发布的消息从Kafka,并保存到一个txt文件在本地磁盘上的某处?这就是我现在写的:

public class Consumer
{
    public void Consume(string topic)
    {
        var config = new Dictionary<string, object>
        {
            {"group.id", "consumer-latest-msg" },
            {"bootstrap.servers", "localhost:9092" },
            {"auto.commit.interval.ms", 5000 },
            {"auto.offset.reset", "latest" }
        };

        using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))
        {
                //what to do?
        }
    }
}
yeotifhr

yeotifhr1#

注意:在我开始写答案之前,我只是一个免责声明:我不熟悉c#,所以我将尝试从概念上回答这个问题。
我认为你需要在你的消费空间里做的就是:
通过调用 subscribe() 消费者方法
调用 poll() 方法-->这将返回新写入的记录
在循环中处理所有记录,即根据需要将每条消息(键和/或值)打包到新文件中。
我在java中很快尝试了这一点,下面是代码片段(包含上述3个步骤),供您参考:

consumer.subscribe(Collections.singleton("test-topic"));

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for(ConsumerRecord<String, String> record : records) {

                fWriter = new FileWriter("data/msg-" + i++);
                pWriter = new PrintWriter(fWriter);
                pWriter.print(record.value());
                pWriter.close();
            }
        }
    } catch (IOException e) {
        e.printStackTrace();
    } finally {
        consumer.close();
    }

在运行这个客户机并从控制台生产者生成两条消息时,在指定的目录(data)中创建了两个文件。
此外,无限循环的原因是为了确保消费者保持运行,因为如果消费者没有定期进行投票,那么消费者组协调器会假设消费者已经死亡,并且消费者组内会触发重新平衡。
我希望这有帮助!

相关问题