我用的是合流Kafka客户机。如何获取本文档中某个主题的最新偏移量?
yfjy0ee71#
当您收到一条消息时,它应该包括主题、分区和它所在位置的偏移量(除了键和值之外)。从这里的例子来看:
consumer.OnMessage += (_, msg) => Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} " + $"Offset: {msg.Offset} {msg.Value}");
当事件到达每个主题分区的末尾时,也会得到一个事件
consumer.OnPartitionEOF += (_, end) => Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}" + $" , next message will be at offset {end.Offset}");
u59ebvdq2#
除了前面的答案,你还可以用
List<TopicPartitionOffsetError> Position(IEnumerable<TopicPartition> partitions)
它将返回从librdkafka为给定主题/分区轮询的最后一个偏移量你也有类似的问题 Committed 方法,用于从使用者提交的最新偏移量还可以查询最新的已知偏移量
Committed
WatermarkOffsets QueryWatermarkOffsets(TopicPartition topicPartition, TimeSpan timeout)
它将向kafka集群发送一个请求。呼叫阻塞,请设置适当的超时。目前,您不能同时在多个分区上发送请求。您可以使用它来获取最后一个已知的偏移量,或者计算滞后量还有
WatermarkOffsets GetWatermarkOffsets(TopicPartition topicPartition)
它将查询librdkafka中的内部状态,并可能返回无效的\u偏移量(-1001)。您可以使用它来检测由于处理数据而产生的延迟(此方法的位置和结果之间的差异)
wqsoz72f3#
我能够读取主题偏移量,而不是从使用者那里检索偏移量信息(我不想先使用消息)( high 以及 low )像这样的制片人:
high
low
var partitionOffset = _producer.QueryWatermarkOffsets(new TopicPartition("myTopic", myPartition), TimeSpan.FromSeconds(10));
3条答案
按热度按时间yfjy0ee71#
当您收到一条消息时,它应该包括主题、分区和它所在位置的偏移量(除了键和值之外)。
从这里的例子来看:
当事件到达每个主题分区的末尾时,也会得到一个事件
u59ebvdq2#
除了前面的答案,你还可以用
它将返回从librdkafka为给定主题/分区轮询的最后一个偏移量
你也有类似的问题
Committed
方法,用于从使用者提交的最新偏移量还可以查询最新的已知偏移量
它将向kafka集群发送一个请求。呼叫阻塞,请设置适当的超时。目前,您不能同时在多个分区上发送请求。您可以使用它来获取最后一个已知的偏移量,或者计算滞后量
还有
它将查询librdkafka中的内部状态,并可能返回无效的\u偏移量(-1001)。您可以使用它来检测由于处理数据而产生的延迟(此方法的位置和结果之间的差异)
wqsoz72f3#
我能够读取主题偏移量,而不是从使用者那里检索偏移量信息(我不想先使用消息)(
high
以及low
)像这样的制片人: