合流kafka如何在不订阅的情况下获取特定主题元数据

wwodge7n  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(687)

我使用一些代码来确定读取偏移量和最大值之间的差异。它需要内部诊断,例如,如果差异低于“基线”值-所以,读取速度是好的。
提供的代码样本工作良好,但我应该知道分区计数的主题。然后,获取主题中每个分区的最大偏移量值。
如何在不订阅主题的情况下获取主题元数据?
类似于:topic“testtopic”有4个分区。

public async Task MaxOffsetValues()
    {
        // just for tests
        // ToDo: use config from settings

        while (true)
        {
            var topicName = "testTopic";
            var config = new ConsumerConfig
            {
                BootstrapServers = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092",
                GroupId = Guid.NewGuid().ToString(),
                ClientId = Dns.GetHostName(),
                EnableAutoCommit = false,
            };

            using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
            {
                var offsetBorders = consumer.QueryWatermarkOffsets(new TopicPartition(topicName, 0), TimeSpan.FromSeconds(10));
                _log.Debug($"[Diagnostic] Topic: ({topicName}), Partition: ({0}) Minimal offset: ({offsetBorders.Low}) Maximum offset: ({offsetBorders.High})");
            }

            await Task.Delay(TimeSpan.FromSeconds(60));
        }
    }
ijnw1ujt

ijnw1ujt1#

因为我的目标是足够的(为了清晰起见,代码部分减少了):

// ...
var brokerServers = _diagnosticSettings.Value.BrokerHosts;
var brokerDelayInSeconds = TimeSpan.FromSeconds(_diagnosticSettings.Value.BrokerDelayInSeconds);

var adminClientConfig = new AdminClientConfig
    {
        BootstrapServers = brokerServers,
        ClientId = Dns.GetHostName(),
    };

adminClient = new AdminClientBuilder(adminClientConfig).Build();

Metadata topicMetadata = null;

topicMetadata = adminClient.GetMetadata(topicExternalName, brokerDelayInSeconds);

partitionCount = topicMetadata.Topics[0].Partitions.Count;

// ...
svmlkihl

svmlkihl2#

如果您正在寻找获取元数据的编程和非编程方法,那么有3种方法可以在不订阅主题的情况下获取有关主题的信息。
管理客户端(https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/admin/adminclient.html)
如果您有权访问confluent control center,那么它会显示有关主题的元数据。
也可以使用kafka-topics.sh cli工具

相关问题