我正在使用.NETCore3.1并使用合流的kafka库
using Confluent.Kafka;
我正在实施Kafka体系,创造生产者和消费者。
我知道我可以很容易地做这样的事情来创建和发送消息到producer中的主题:
using (var producer = new ProducerBuilder<long, string>(config).Build())
{
for(var i=0; i<1000; i++)
{
var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, string> { Key
= i, Value = "lorem ipsum "+i });
}
}
效果不错。
但是如果我想用一个对象来代替呢?以下操作无效:
using (var producer = new ProducerBuilder<long, User>(config).Build())
{
for(var i=0; i<1000; i++)
{
var user = new User() { Id = i, Name = "random" };
var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, User> { Key
= i, Value = user });
}
}
它少了什么?我听说有一种方法可以做类似的事情,但是找不到。
2条答案
按热度按时间fjnneemd1#
Kafka接受任何客户机中的字节,而不是“对象”。您需要实现自己的序列化程序,或者使用合流模式注册表支持的序列化程序。
json示例-https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/jsonserialization
avro示例-https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/avrospecific
原型示例-https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/protobuf
s4n0splo2#
在这种情况下,必须序列化对象。据我所知,json有效负载超出了您所使用的库的范围。
因此,您可以使用avro(但需要schema registry)。举个例子: