带avro序列的c#合流kafka问题

fiei3ece  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(462)

我用docker来运行kafka和其他服务https://github.com/confluentinc/cp-all-in-one 在我的测试项目中使用了kafka、avro和schemaregistry的合流nuget包。
如果要发送json消息,到目前为止我没有问题,但是我正在努力发送avro序列化消息。
我看见了https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/avrospecific 我尝试了同样的方法,但最终我得到了如下例外:
本地:值序列化错误
在confluent.kafka.producer 2.<ProduceAsync>d__52.MoveNext() at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task) at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task) at System.Runtime.CompilerServices.TaskAwaiter 1.c:\users\lu95eb\source\repos\kafka\u playground\kafka producer\kafkaservice.d\u10.movenext()中的getresult():第126行
内部例外
对象引用未设置为对象的示例。
位于confluent.schemaregistry.serdes.specificserializerimpl 1..ctor(ISchemaRegistryClient schemaRegistryClient, Boolean autoRegisterSchema, Int32 initialBufferSize) at Confluent.SchemaRegistry.Serdes.AvroSerializer 1.在system.runtime.compilerservices.taskawaiter.ThrowfornSuccess(task task)在system.runtime.compilerservices.taskawaiter.handlenonsuccessanddebuggernotification(task task)在system.runtime.compilerservices.taskawaiter.validateend(task task task)在confluent.kafka.producer`2.d\ u 52.movenext()
这是我的录音课

public class UserInfo : ISpecificRecord
{
    public string Name { get; set; }
    public int[] Numbers { get; set; }

    public Schema Schema => Schema.Parse(@"
        {
          ""name"": ""UserInfo"",
          ""type"": ""record"",
          ""namespace"": ""kafka"",
          ""fields"": [
            {
              ""name"": ""Name"",
              ""type"": ""string""
            },
            {
              ""name"": ""Numbers"",
              ""type"": {
                ""type"": ""array"",
                ""items"": ""int""
              }
            }
          ]
        }
        ");

    public object Get(int fieldPos)
    {
        switch (fieldPos)
        {
            case 0: return Name;
            case 1: return Numbers;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Get()");
        }
    }

    public void Put(int fieldPos, object fieldValue)
    {
        switch (fieldPos)
        {
            case 0: Name = (string)fieldValue; break;
            case 1: Numbers = (int[])fieldValue; break;
            default: throw new AvroRuntimeException($"Bad index {fieldPos} in Put()");
        }
    }
}

以及用于发送消息的方法

private async Task SendSpecificRecord(UserInfo userInfo)
    {
        using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = _schemaRegistryUrl }))
        using (var producer =
            new ProducerBuilder<string, UserInfo>(new ProducerConfig { BootstrapServers = _brokerUrl })
                .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                .SetValueSerializer(new AvroSerializer<UserInfo>(schemaRegistry))
                .Build())
        {

            var message = new Message<string, UserInfo>
            {
                Key = userInfo.Name,
                Value = userInfo
            };

            await producer.ProduceAsync(SpecificTopic, message);
        }
    }

kafkaservice.cs:第126行是 await producer.ProduceAsync(SpecificTopic, message); 就像我一开始写的,我对schemaregistry没有问题-我注册了schema,它们对json正常工作,我对主题、代理、使用者或其他方面没有问题。
如果有人能指出我做错了什么,我将不胜感激。先谢谢你。

k0pti3hp

k0pti3hp1#

如果有人对解决方案感到好奇(我无法想象有人会是怎样的;)然后,我写了'自定义'avro序列化和反序列化,像一个魅力的作品。

public class CustomAvroSerializer<T> : IAsyncSerializer<T>
    where T : class, ISpecificRecord
{
    public Task<byte[]> SerializeAsync(T data, SerializationContext context)
    {
        return Task.Run(() =>
        {
            using (var ms = new MemoryStream())
            {
                var enc = new BinaryEncoder(ms);
                var writer = new SpecificDefaultWriter(data.Schema);
                writer.Write(data, enc);
                return ms.ToArray();
            }
        });
    }
}

public class CustomAvroDeserializer<T> : IDeserializer<T>
    where T : class, ISpecificRecord
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        using (var ms = new MemoryStream(data.ToArray()))
        {
            var dec = new BinaryDecoder(ms);
            var regenObj = (T)Activator.CreateInstance(typeof(T));

            var reader = new SpecificDefaultReader(regenObj.Schema, regenObj.Schema);
            reader.Read(regenObj, dec);
            return regenObj;
        }
    }
}

相关问题