在我最近的c#项目中,我正在使用confluent kafka包。我用以下方式创建了一个制作人:
prodConfig = new ProducerConfig { BootstrapServers = "xxx.xxx.xxx.xxx:xxx"};
foreach(msg in msglist){
using(var producer = new ProducerBuilder<Null, string>(prodConfig).Build()){
producer.ProduceAsync(topic, new Message<Null, string> {Value = msg});
}
}
但问题是,我的一些信息没有传达给消费者。他们在某处迷路了。但是,如果我对生产者使用wait,那么所有消息都会被传递。如何在没有等待的情况下传递我所有的信息(我有一个单独的分区)
1条答案
按热度按时间vtwuwzda1#
首先,你应该只使用一个
Producer
把你的msgList
,因为创建新的Producer
因为每封信都很贵。你能做的就是
Produce()
方法Flush()
. 与Produce()
您将异步发送消息,而无需等待响应。然后打电话给Flush()
将阻止,直到所有飞行中的消息都被传递。没有
await
或者Flush()
您可能会丢失消息,因为您的生产者可能在传递所有消息之前被释放。