通过传入消息在.net中创建kafka producer

k3bvogb1  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(373)

我想知道这里的研发人员是否有Kafka使用.net的经验。下面的代码片段来自一个客户机,是用java编写的。java的kafka库似乎比.net的库丰富得多。我试图做的是在客户端站点的远程服务器上启动一个kafka生产者,以便通过kafka生产者传递rta状态。
我需要做的是在.net中重新创建下面的代码,特别是最后一行“openInterfaceSubscriber.send()”。我使用的是来自confluent的.net包。任何帮助都将不胜感激。

Map<String, Object> props = new HashMap<>();
         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
("xxx.xx.xxx.xxx:9092");
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "ept-oi-log");
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
IntegerDeserializer.class);

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class); 

DefaultKafkaProducerFactory<Integer, String> producer = new 
DefaultKafkaProducerFactory<>( producerConfigs(props));

KafkaTemplate<Integer, String> openInterfacesSubscriber = new KafkaTemplate<> 
(producer); 

for (all in { "AGENTBYACCOUNTMEASURES", "AGENTBYROUTINGSERVICEMEASURES") {

String subRequest = String.format("   {\"userName\":\"%s\",\"password\":\"%s\",\"subscriptionRequestId\":\"5d09vjfgk\",\"request\":\"SUBSCRIBE\", \"measuresStream\":\"% s\",\"version\":\"3.4\"}", "MikeGrey@odl.lab",  "Avaya123", measureName); 

// THERE IS NO KAFKA SECURITY HERE: USERNAME/PWD ABOVE IS FOR THE subRequest
// STRING ONLY, ANYONE CAN CONNECT TO THIS KAKFA INSTANCE.

openInterfacesSubscriber.send("realtimesubscriptionrequest", 0, i++, subRequest);
shyt4zoc

shyt4zoc1#

欢迎使用堆栈溢出!
.NETAPI仍在开发中,在windows上托管也有点棘手(或者我发现是这样)。
我一直在使用实验性的.net kafka客户端,发现很容易“Map”到您发布的java源代码:
配置Map就变成了一个字典
序列化/反序列化设置可以通过 Producer 建造师。您可能需要更改编码。 KafkaTemplate<>.send Map到 Producer.ProduceAsync 请记住,客户机仍在不断发展,下面是一个版本的代码,它与著名的nuget版本一起为我工作:

static void Main(string[] args)
{          
    // Client: .net core console app 2.0 / Confluent.Kafka nuget 1.0.0-experimental-2
    // Server: Kafka 1.0.0
    Dictionary<string, object> config = new Dictionary<string, object>()
    {
        { "bootstrap.servers", "ept-oi-log" },
        { "group.id", "ept-oi-log" },
        { "enable.auto.commit", true },
        { "session.timeout.ms", 15000 },
        { "client.id", "1" },
    };

    Producer(config).Wait();
}

static async Task Producer(IEnumerable<KeyValuePair<string, object>> kafkaConfig)
{
    var kafkaTopic = "realtimesubscriptionrequest";
    using (var producer = new Producer<int, string>(kafkaConfig, new IntSerializer(), new StringSerializer(System.Text.Encoding.UTF8)))
    {
        int i = 0;
        foreach (var measureName in new[] { "AGENTBYACCOUNTMEASURES", "AGENTBYROUTINGSERVICEMEASURES" })
        {
            String subRequest = String.Format(@"   {{""userName"":""{0}"",""password"":""{1}"",""subscriptionRequestId"":""5d09vjfgk"",""request"":""SUBSCRIBE"", ""measuresStream"":""{2}"",""version"":""3.4""}}", 
                "ggghhhh@xxx.lab", "xxxxxxx", measureName);
            await producer.ProduceAsync(kafkaTopic, new Message<int, string>() { Key = i++, Value = subRequest });
        }
    }
}

相关问题