我使用一些代码来确定读取偏移量和最大值之间的差异。它需要内部诊断,例如,如果差异低于“基线”值-所以,读取速度是好的。
提供的代码样本工作良好,但我应该知道分区计数的主题。然后,获取主题中每个分区的最大偏移量值。
如何在不订阅主题的情况下获取主题元数据?
类似于:topic“testtopic”有4个分区。
public async Task MaxOffsetValues()
{
// just for tests
// ToDo: use config from settings
while (true)
{
var topicName = "testTopic";
var config = new ConsumerConfig
{
BootstrapServers = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092",
GroupId = Guid.NewGuid().ToString(),
ClientId = Dns.GetHostName(),
EnableAutoCommit = false,
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
var offsetBorders = consumer.QueryWatermarkOffsets(new TopicPartition(topicName, 0), TimeSpan.FromSeconds(10));
_log.Debug($"[Diagnostic] Topic: ({topicName}), Partition: ({0}) Minimal offset: ({offsetBorders.Low}) Maximum offset: ({offsetBorders.High})");
}
await Task.Delay(TimeSpan.FromSeconds(60));
}
}
2条答案
按热度按时间ijnw1ujt1#
因为我的目标是足够的(为了清晰起见,代码部分减少了):
svmlkihl2#
如果您正在寻找获取元数据的编程和非编程方法,那么有3种方法可以在不订阅主题的情况下获取有关主题的信息。
管理客户端(https://kafka.apache.org/23/javadoc/index.html?org/apache/kafka/clients/admin/adminclient.html)
如果您有权访问confluent control center,那么它会显示有关主题的元数据。
也可以使用kafka-topics.sh cli工具