如何将.net类序列化为avro.generic.genericord以发布到kafka主题中?

u0sqgete  于 2021-06-07  发布在  Kafka
关注(0)|答案(2)|浏览(446)

我正在尝试找到一种方法/帮助程序来将.net类转换为avro.generic.genericrecord。目前,我正在手动将字段名和字段值添加到通用记录中。是否有一个序列化程序/转换器,我可以使用它将对象转换为通用记录并发布到Kafka主题。

class Plant
{
 public long Id { get; set; }
 public string Name { get; set; }
 public List<PlantProperties> PlantProperties{ get; set; }
}
class PlantProperties
{
 public long Leaves{ get; set; }
 public string Color{ get; set; }
}

请建议。

zqry0prt

zqry0prt1#

下面是我使用@cricket\u007的建议来解决问题的步骤。
为了避免编写avro模式的复杂性,首先创建c类,然后使用avroserializer生成模式。
avroserializer.create().writerschema.tostring()
这将为类生成模式json。将其移动到架构文件并
根据需要使所有类型都具有空值
然后使用avro\u gen.exe工具重新生成实现isspecific记录的类文件。
添加用于发布到队列的以下代码

using (var serdeProvider = new AvroSerdeProvider(avroConfig))
        using (var producer = new Producer<string, MYClass>(producerConfig, 
  serdeProvider.GetSerializerGenerator<string>(), 
  serdeProvider.GetSerializerGenerator<MYClass>()))
        {
            Console.WriteLine($"{producer.Name} producing on 
       {_appSettings.PullListKafka.Topic}.");  

            producer.ProduceAsync(_appSettings.PullListKafka.Topic, new 
Message<string, MYClass> { Key = Guid.NewGuid().ToString(), Value = MYClassObject})
                    .ContinueWith(task => task.IsFaulted
                        ? $"error producing message: {task.Exception.Message}"
                        : $"produced to: {task.Result.TopicPartitionOffset}");

        }

一些链接可以帮助您做到这一点。
https://shanidgafur.github.io/blog/apache-avro-on-dotnethttpshttp://github.com/sidshetye/helloavro/tree/master/avro

iaqfqrcu

iaqfqrcu2#

假设您使用的是合流模式regsitry,则可以使用它们的.net客户端1
https://github.com/confluentinc/confluent-kafka-dotnet
从examples文件夹复制

using (var serdeProvider = new AvroSerdeProvider(avroConfig))
    using (var producer = new Producer<string, GenericRecord>(producerConfig, serdeProvider.GetSerializerGenerator<string>(), serdeProvider.GetSerializerGenerator<GenericRecord>()))
    {
        Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

        int i = 0;
        string text;
        while ((text = Console.ReadLine()) != "q")
        {
            var record = new GenericRecord(s);
            record.Add("name", text);
            record.Add("favorite_number", i++);
            record.Add("favorite_color", "blue");

            producer
                .ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                .ContinueWith(task => task.IsFaulted
                    ? $"error producing message: {task.Exception.Message}"
                    : $"produced to: {task.Result.TopicPartitionOffset}");
        }
    }

    cts.Cancel();
}

在你的情况下,更新 record.Add 相应地使用
但是,由于您有一个类,因此,您应该尝试使用specificrecord,而不是通过genericrecord在avro和.net类之间来回序列化。请参阅avrogen工具上的自述部分以获取此示例
1我不知道有没有其他的.net库

相关问题