我使用的是confluent.kafka.net客户端版本1.3.0。我想从给定的时间开始使用消息。
要做到这一点,我需要 OffsetsForTimes
以获得所需的偏移和 Commit
该分区的偏移量:
private void SetOffset()
{
const string Topic = "myTopic";
const string BootstrapServers = "server1, server2";
var adminClient = new AdminClientBuilder(
new Dictionary<string, string>
{
{ "bootstrap.servers", BootstrapServers },
{ "security.protocol", "sasl_plaintext" },
{ "sasl.mechanisms", "PLAIN" },
{ "sasl.username", this.kafkaUsername },
{ "sasl.password", this.kafkaPassword }
}).Build();
var consumer = new ConsumerBuilder<byte[], byte[]>(
new Dictionary<string, string>
{
{ "bootstrap.servers", BootstrapServers },
{ "group.id", this.groupId },
{ "enable.auto.commit", "false" },
{ "security.protocol", "sasl_plaintext" },
{ "sasl.mechanisms", "PLAIN" },
{ "sasl.username", this.kafkaUsername },
{ "sasl.password", this.kafkaPassword }
}).Build();
// Timestamp to which the offset should be set to
var timeStamp = new DateTime(2020, 3, 1, 0, 0, 0, DateTimeKind.Utc);
var newOffsets = new List<TopicPartitionOffset>();
var metadata = adminClient.GetMetadata(Topic, TimeSpan.FromSeconds(30));
foreach (var topicMetadata in metadata.Topics)
{
if (topicMetadata.Topic == Topic)
{
foreach (var partitionMetadata in topicMetadata.Partitions.OrderBy(p => p.PartitionId))
{
var topicPartition = new TopicPartition(topicMetadata.Topic, partitionMetadata.PartitionId);
IEnumerable<TopicPartitionOffset> found = consumer.OffsetsForTimes(
new[] { new TopicPartitionTimestamp(topicPartition, new Timestamp(timeStamp, TimestampType.CreateTime)) },
TimeSpan.FromSeconds(5));
newOffsets.Add(new TopicPartitionOffset(topicPartition, new Offset(found.First().Offset)));
}
}
}
consumer.Commit(newOffsets);
// Consume messages
consumer.Subscribe(Topic);
var consumerResult = consumer.Consume();
// process message
//consumer.Commit(consumerResult);
}
如果我想跳过消息并跳转到给定的偏移量(如果我想跳转到的偏移量在最后一个提交的消息之后),这个方法就可以了。
但是,如果给定的时间戳早于最后提交的消息的时间戳,则上述方法将不起作用。在上述代码中,如果 timeStamp
在最后提交的消息的时间戳之前,则 OffsetsForTimes
将返回最后提交的消息的偏移量+1。即使我手动将偏移设置为较低的偏移 consumer.Commit(newOffsets)
似乎没有任何影响,我得到的第一个未提交的消息时,消费。
有没有办法从代码中实现这一点?
2条答案
按热度按时间sq1bmfud1#
我不是Maven,但我要试着解释一下你是怎么做到的。
首先,我们必须提到subscribe和assign方法。
使用subscribe时,传递一个或多个主题。这样,根据使用者组中的使用者数量,将每个主题的分区列表分配给使用者。主题分区是由主题名称和分区号组成的对象。
可以使用assign传递使用者将读取的分区。此方法不使用使用者的组管理功能(不需要group.id)。如果我没有错,可以在assign方法中指定初始偏移量。
另一个选项是使用seek()方法设置偏移量
如果你要混合订阅和分配记住,你必须使用取消订阅之前。
如果要重新使用所有消息,另一个选项是在新的不同使用者组中创建一个使用者。
举例(复习)
我现在就离开你,等会儿再查。我用java做了这个例子,因为我对它比较熟悉。在本例中,我不使用subscribe,而是使用assign。首先检索主题分区,我们设置从中读取消息的开始日期时间,然后为每个分区创建一个指定该日期时间的Map。
通过创建的Map,我们可以使用offsetsfortimes方法获得每个分区在指定日期时间的偏移量。对于每个分区的偏移量,我们使用seek移动到每个分区上的偏移量,最后使用消息。
我现在没有时间检查代码,但我会做的。希望对你有帮助。
y3bcpkx12#
如果分配给每个分区并说明开始读取的偏移量,就可以这样做。
以下是获取主题分区列表的方式:
这就是如何找到特定时间的偏移量:
这是通过分配给所有主题分区和特定偏移量来使用的方式:
如果您坚持要将此应用于使用者组,则另一种选择是重置使用者组并使用您的代码,或者创建新的使用者组。