如何在合流Kafkac#库中获得Kafka主题的最新偏移量?

oprakyz7  于 2021-06-08  发布在  Kafka
关注(0)|答案(3)|浏览(393)

我用的是合流Kafka客户机。如何获取本文档中某个主题的最新偏移量?

yfjy0ee7

yfjy0ee71#

当您收到一条消息时,它应该包括主题、分区和它所在位置的偏移量(除了键和值之外)。
从这里的例子来看:

consumer.OnMessage += (_, msg)
  => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " +
        $"Offset: {msg.Offset} {msg.Value}");

当事件到达每个主题分区的末尾时,也会得到一个事件

consumer.OnPartitionEOF += (_, end)
  => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" +
          $" , next message will be at offset {end.Offset}");
u59ebvdq

u59ebvdq2#

除了前面的答案,你还可以用

List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)

它将返回从librdkafka为给定主题/分区轮询的最后一个偏移量
你也有类似的问题 Committed 方法,用于从使用者提交的最新偏移量
还可以查询最新的已知偏移量

WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)

它将向kafka集群发送一个请求。呼叫阻塞,请设置适当的超时。目前,您不能同时在多个分区上发送请求。您可以使用它来获取最后一个已知的偏移量,或者计算滞后量
还有

WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)

它将查询librdkafka中的内部状态,并可能返回无效的\u偏移量(-1001)。您可以使用它来检测由于处理数据而产生的延迟(此方法的位置和结果之间的差异)

wqsoz72f

wqsoz72f3#

我能够读取主题偏移量,而不是从使用者那里检索偏移量信息(我不想先使用消息)( high 以及 low )像这样的制片人:

var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10));

相关问题