我正在使用c#中的kafka网络库,尝试在本地计算机上运行的kafka服务器上发布来自for循环的消息(https://github.com/jroland/kafka-net),按照链接回购的示例,我的代码如下。
string kafkaServer = "http://" + ip + ":" + port;
Uri uri = new Uri(kafkaServer);
var options = new KafkaOptions(uri);
var router = new BrokerRouter(options);
var client = new Producer(router);
...
foreach(string sumup in LS){
payloadJson["fullPathDataFolder"] = remoteFolder;
payloadJson["globalName"] = GloablName;
payloadJson["name"] = name;
payloadJson["text"] = sumUp + "\n";
payloadJson["type"] = type;
string payload = payloadJson.ToString();
KafkaNet.Protocol.Message msg = new KafkaNet.Protocol.Message(payload);
client.SendMessageAsync(topic, new List<KafkaNet.Protocol.Message> { msg }).Wait();
}
这段代码在第一次迭代中运行,我能够从第二台机器检索第一个字符串,该机器与一个使用者链接到同一个服务器。那么上面的代码仍然停留在指令中:
client.SendMessageAsync(topic, new List<KafkaNet.Protocol.Message> { msg }).Wait();
如果现在我删除.wait()这个代码运行得很快,但是它随机忽略了我发送的一些字符串,我遗漏了什么?
p、 s.i'v也尝试使用confluent.kafka,我也遇到了类似的问题
p、 另外,这个程序按顺序运行很重要。
暂无答案!
目前还没有任何答案,快来回答吧!