我用docker来运行kafka和其他服务https://github.com/confluentinc/cp-all-in-one 在我的测试项目中使用了kafka、avro和schemaregistry的合流nuget包。
如果要发送json消息,到目前为止我没有问题,但是我正在努力发送avro序列化消息。
我看见了https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/avrospecific 我尝试了同样的方法,但最终我得到了如下例外:
本地:值序列化错误
在confluent.kafka.producer 2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter
1.c:\users\lu95eb\source\repos\kafka\u playground\kafka producer\kafkaservice.d\u10.movenext()中的getresult():第126行
内部例外
对象引用未设置为对象的示例。
位于confluent.schemaregistry.serdes.specificserializerimpl 1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer
1.在system.runtime.compilerservices.taskawaiter.ThrowfornSuccess(task task)在system.runtime.compilerservices.taskawaiter.handlenonsuccessanddebuggernotification(task task)在system.runtime.compilerservices.taskawaiter.validateend(task task task)在confluent.kafka.producer`2.d\ u 52.movenext()
这是我的录音课
public class UserInfo : ISpecificRecord
{
public string Name { get; set; }
public int[] Numbers { get; set; }
public Schema Schema => Schema.Parse(@"
{
""name"": ""UserInfo"",
""type"": ""record"",
""namespace"": ""kafka"",
""fields"": [
{
""name"": ""Name"",
""type"": ""string""
},
{
""name"": ""Numbers"",
""type"": {
""type"": ""array"",
""items"": ""int""
}
}
]
}
");
public object Get(int fieldPos)
{
switch (fieldPos)
{
case 0: return Name;
case 1: return Numbers;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
}
}
public void Put(int fieldPos, object fieldValue)
{
switch (fieldPos)
{
case 0: Name = (string)fieldValue; break;
case 1: Numbers = (int[])fieldValue; break;
default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
}
}
}
以及用于发送消息的方法
private async Task SendSpecificRecord(UserInfo userInfo)
{
using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
using (var producer =
new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
.SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
.SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
.Build())
{
var message = new Message<string, UserInfo>
{
Key = userInfo.Name,
Value = userInfo
};
await producer.ProduceAsync(SpecificTopic, message);
}
}
kafkaservice.cs:第126行是 await producer.ProduceAsync(SpecificTopic, message);
就像我一开始写的,我对schemaregistry没有问题-我注册了schema,它们对json正常工作,我对主题、代理、使用者或其他方面没有问题。
如果有人能指出我做错了什么,我将不胜感激。先谢谢你。
1条答案
按热度按时间k0pti3hp1#
如果有人对解决方案感到好奇(我无法想象有人会是怎样的;)然后,我写了'自定义'avro序列化和反序列化,像一个魅力的作品。