在合流的dotnet中kafka在.net的消息生成器中传递对象

xdyibdwo  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(393)

我正在使用.NETCore3.1并使用合流的kafka库

using Confluent.Kafka;

我正在实施Kafka体系,创造生产者和消费者。
我知道我可以很容易地做这样的事情来创建和发送消息到producer中的主题:

using (var producer = new ProducerBuilder<long, string>(config).Build()) 
{
 for(var i=0; i<1000; i++)
 {
  var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, string> { Key 
 = i, Value = "lorem ipsum "+i });
 }
}

效果不错。
但是如果我想用一个对象来代替呢?以下操作无效:

using (var producer = new ProducerBuilder<long, User>(config).Build()) 
{
 for(var i=0; i<1000; i++)
 {
  var user = new User() { Id = i, Name = "random" }; 
  var deliveryReport = await producer.ProduceAsync(kafkaTopic, new Message<long, User> { Key 
 = i, Value = user });
 }
}

它少了什么?我听说有一种方法可以做类似的事情,但是找不到。

fjnneemd

fjnneemd1#

Kafka接受任何客户机中的字节,而不是“对象”。您需要实现自己的序列化程序,或者使用合流模式注册表支持的序列化程序。
json示例-https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/jsonserialization
avro示例-https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/avrospecific
原型示例-https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples/protobuf

s4n0splo

s4n0splo2#

在这种情况下,必须序列化对象。据我所知,json有效负载超出了您所使用的库的范围。
因此,您可以使用avro(但需要schema registry)。举个例子:

using Avro;
using Avro.Generic;
using Confluent.Kafka.SyncOverAsync;
using Confluent.SchemaRegistry.Serdes;
using Confluent.SchemaRegistry;
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Confluent.Kafka.Examples.AvroGeneric
{
    class Program
    {
        static async Task Main(string[] args)
        {
            if (args.Length != 3)
            {
                Console.WriteLine("Usage: .. bootstrapServers schemaRegistryUrl topicName");
                return;
            }

            string bootstrapServers = args[0];
            string schemaRegistryUrl = args[1];
            string topicName = args[2];
            string groupName = "avro-generic-example-group";

            // var s = (RecordSchema)RecordSchema.Parse(File.ReadAllText("my-schema.json"));
            var s = (RecordSchema)RecordSchema.Parse(
                @"{
                    ""namespace"": ""Confluent.Kafka.Examples.AvroSpecific"",
                    ""type"": ""record"",
                    ""name"": ""User"",
                    ""fields"": [
                        {""name"": ""name"", ""type"": ""string""},
                        {""name"": ""favorite_number"",  ""type"": [""int"", ""null""]},
                        {""name"": ""favorite_color"", ""type"": [""string"", ""null""]}
                    ]
                  }"
            );

            CancellationTokenSource cts = new CancellationTokenSource();
            var consumeTask = Task.Run(() =>
            {
                using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
                using (var consumer =
                    new ConsumerBuilder<string, GenericRecord>(new ConsumerConfig { BootstrapServers = bootstrapServers, GroupId = groupName })
                        .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry).AsSyncOverAsync())
                        .SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync())
                        .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                        .Build())
                {
                    consumer.Subscribe(topicName);

                    try
                    {
                        while (true)
                        {
                            try
                            {
                                var consumeResult = consumer.Consume(cts.Token);

                                Console.WriteLine($"Key: {consumeResult.Message.Key}\nValue: {consumeResult.Message.Value}");
                            }
                            catch (ConsumeException e)
                            {
                                Console.WriteLine($"Consume error: {e.Error.Reason}");
                            }
                        }
                    }
                    catch (OperationCanceledException)
                    {
                        // commit final offsets and leave the group.
                        consumer.Close();
                    }
                }
            });

            using (var schemaRegistry = new CachedSchemaRegistryClient(new SchemaRegistryConfig { Url = schemaRegistryUrl }))
            using (var producer =
                new ProducerBuilder<string, GenericRecord>(new ProducerConfig { BootstrapServers = bootstrapServers })
                    .SetKeySerializer(new AvroSerializer<string>(schemaRegistry))
                    .SetValueSerializer(new AvroSerializer<GenericRecord>(schemaRegistry))
                    .Build())
            {
                Console.WriteLine($"{producer.Name} producing on {topicName}. Enter user names, q to exit.");

                int i = 0;
                string text;
                while ((text = Console.ReadLine()) != "q")
                {
                    var record = new GenericRecord(s);
                    record.Add("name", text);
                    record.Add("favorite_number", i++);
                    record.Add("favorite_color", "blue");

                    await producer
                        .ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record })
                        .ContinueWith(task => task.IsFaulted
                            ? $"error producing message: {task.Exception.Message}"
                            : $"produced to: {task.Result.TopicPartitionOffset}");
                }
            }

            cts.Cancel();
        }
    }
}

相关问题