我想知道这里的研发人员是否有Kafka使用.net的经验。下面的代码片段来自一个客户机,是用java编写的。java的kafka库似乎比.net的库丰富得多。我试图做的是在客户端站点的远程服务器上启动一个kafka生产者,以便通过kafka生产者传递rta状态。
我需要做的是在.net中重新创建下面的代码,特别是最后一行“openInterfaceSubscriber.send()”。我使用的是来自confluent的.net包。任何帮助都将不胜感激。
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
("xxx.xx.xxx.xxx:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "ept-oi-log");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
DefaultKafkaProducerFactory<Integer, String> producer = new
DefaultKafkaProducerFactory<>( producerConfigs(props));
KafkaTemplate<Integer, String> openInterfacesSubscriber = new KafkaTemplate<>
(producer);
for (all in { "AGENTBYACCOUNTMEASURES", "AGENTBYROUTINGSERVICEMEASURES") {
String subRequest = String.format(" {\"userName\":\"%s\",\"password\":\"%s\",\"subscriptionRequestId\":\"5d09vjfgk\",\"request\":\"SUBSCRIBE\", \"measuresStream\":\"% s\",\"version\":\"3.4\"}", "MikeGrey@odl.lab", "Avaya123", measureName);
// THERE IS NO KAFKA SECURITY HERE: USERNAME/PWD ABOVE IS FOR THE subRequest
// STRING ONLY, ANYONE CAN CONNECT TO THIS KAKFA INSTANCE.
openInterfacesSubscriber.send("realtimesubscriptionrequest", 0, i++, subRequest);
1条答案
按热度按时间shyt4zoc1#
欢迎使用堆栈溢出!
.NETAPI仍在开发中,在windows上托管也有点棘手(或者我发现是这样)。
我一直在使用实验性的.net kafka客户端,发现很容易“Map”到您发布的java源代码:
配置Map就变成了一个字典
序列化/反序列化设置可以通过
Producer
建造师。您可能需要更改编码。KafkaTemplate<>.send
Map到Producer.ProduceAsync
请记住,客户机仍在不断发展,下面是一个版本的代码,它与著名的nuget版本一起为我工作: