如何使用confluent.kafka.net客户端创建Kafka主题

scyqe7ek  于 2021-06-06  发布在  Kafka
关注(0)|答案(3)|浏览(535)

它似乎是Kafka最流行的.net客户端(https://github.com/confluentinc/confluent-kafka-dotnet)缺少设置和创建主题的方法。打电话的时候 Producer.ProduceAsync() 主题是自动创建的,但我找不到设置分区、保留策略和其他设置的方法。
我试图在网上找到任何例子,但我找到的只是使用默认值。
也许我可以用另一个.net客户端来代替?

vyu0f0g1

vyu0f0g11#

Confluent.Kafka.AdminClient 有以下版本 1.0.0-experimental-2 但不允许创建主题等。
它建立在 librdkafka 它还没有这个的API。
因此,现在您必须在代理上使用。 bin\windows\kafka-topics.sh --create ...

bmvo0sr5

bmvo0sr52#

confluent还没有提供任何API来从DotNet客户端创建主题,但是有一个解决方法。
auto.create.topics.enable = true Kafka构型
使用 var brokerMetadata = producer.GetMetadata(false, topicName); 要查询现有代理中可用的主题,如果指定的主题不可用,则kafka将创建具有指定名称的主题。

private static bool CreateTopicIfNotExist(Producer producer, string topicName)
    {
        bool isTopicExist = producer.GetMetadata().Topics.Any(t => t.Topic == topicName);
        if (!isTopicExist)
        {
            //Creates topic if it is not exist; Only in case of auto.create.topics.enable = true is set into kafka configuration
            var topicMetadata = producer.GetMetadata(false, topicName).Topics.FirstOrDefault();
            if (topicMetadata != null && (topicMetadata.Error.Code != ErrorCode.UnknownTopicOrPart || topicMetadata.Error.Code == ErrorCode.Local_UnknownTopic))
                isTopicExist = true;
        }
        return isTopicExist;
    }

因此,你可以利用这个工作,我知道这是肮脏的解决方案,但似乎没有其他办法,因为现在。

ovfsdjhp

ovfsdjhp3#

它现在可以在confluent.kafka.net客户端库的最新版本中获得。
请参见:https://github.com/confluentinc/confluent-kafka-dotnet/blob/b7b04fed82762c67c2841d7481eae59dee3e4e20/examples/adminclient/program.cs

using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build())
        {
            try
            {
                await adminClient.CreateTopicsAsync(new TopicSpecification[] { 
                    new TopicSpecification { Name = topicName, ReplicationFactor = 1, NumPartitions = 1 } });
            }
            catch (CreateTopicsException e)
            {
                Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");
            }
        }

相关问题