重新使用给定时间的Kafka消息

qlfbtfca  于 2021-06-05  发布在  Kafka
关注(0)|答案(2)|浏览(424)

我使用的是confluent.kafka.net客户端版本1.3.0。我想从给定的时间开始使用消息。
要做到这一点,我需要 OffsetsForTimes 以获得所需的偏移和 Commit 该分区的偏移量:

private void SetOffset()
{
    const string Topic = "myTopic";
    const string BootstrapServers = "server1, server2";

    var adminClient = new AdminClientBuilder(
        new Dictionary<string, string>
        {
            { "bootstrap.servers", BootstrapServers },
            { "security.protocol", "sasl_plaintext" },
            { "sasl.mechanisms", "PLAIN" },
            { "sasl.username", this.kafkaUsername },
            { "sasl.password", this.kafkaPassword }
        }).Build();

    var consumer = new ConsumerBuilder<byte[], byte[]>(
        new Dictionary<string, string>
        {
            { "bootstrap.servers", BootstrapServers },
            { "group.id", this.groupId },
            { "enable.auto.commit", "false" },
            { "security.protocol", "sasl_plaintext" },
            { "sasl.mechanisms", "PLAIN" },
            { "sasl.username", this.kafkaUsername },
            { "sasl.password", this.kafkaPassword }
        }).Build();

    // Timestamp to which the offset should be set to
    var timeStamp = new DateTime(2020, 3, 1, 0, 0, 0, DateTimeKind.Utc);

    var newOffsets = new List<TopicPartitionOffset>();
    var metadata = adminClient.GetMetadata(Topic, TimeSpan.FromSeconds(30));
    foreach (var topicMetadata in metadata.Topics)
    {
        if (topicMetadata.Topic == Topic)
        {
            foreach (var partitionMetadata in topicMetadata.Partitions.OrderBy(p => p.PartitionId))
            {
                var topicPartition = new TopicPartition(topicMetadata.Topic, partitionMetadata.PartitionId);

                IEnumerable<TopicPartitionOffset> found = consumer.OffsetsForTimes(
                    new[] { new TopicPartitionTimestamp(topicPartition, new Timestamp(timeStamp, TimestampType.CreateTime)) },
                    TimeSpan.FromSeconds(5));

                newOffsets.Add(new TopicPartitionOffset(topicPartition, new Offset(found.First().Offset)));
            }
        }
    }

    consumer.Commit(newOffsets);

    // Consume messages
    consumer.Subscribe(Topic);
    var consumerResult = consumer.Consume();
    // process message
    //consumer.Commit(consumerResult);
}

如果我想跳过消息并跳转到给定的偏移量(如果我想跳转到的偏移量在最后一个提交的消息之后),这个方法就可以了。
但是,如果给定的时间戳早于最后提交的消息的时间戳,则上述方法将不起作用。在上述代码中,如果 timeStamp 在最后提交的消息的时间戳之前,则 OffsetsForTimes 将返回最后提交的消息的偏移量+1。即使我手动将偏移设置为较低的偏移 consumer.Commit(newOffsets) 似乎没有任何影响,我得到的第一个未提交的消息时,消费。
有没有办法从代码中实现这一点?

sq1bmfud

sq1bmfud1#

我不是Maven,但我要试着解释一下你是怎么做到的。
首先,我们必须提到subscribe和assign方法。
使用subscribe时,传递一个或多个主题。这样,根据使用者组中的使用者数量,将每个主题的分区列表分配给使用者。主题分区是由主题名称和分区号组成的对象。

consumer.Subscribe(Topic);

可以使用assign传递使用者将读取的分区。此方法不使用使用者的组管理功能(不需要group.id)。如果我没有错,可以在assign方法中指定初始偏移量。

consumer.Assign(topicName, 0, new Offset(lastConsumedOffset));
consumer.Assign(topicPartition, new Offset(lastConsumedOffset));

另一个选项是使用seek()方法设置偏移量

consumer.Seek(topicPartitionOffset);

如果你要混合订阅和分配记住,你必须使用取消订阅之前。
如果要重新使用所有消息,另一个选项是在新的不同使用者组中创建一个使用者。

举例(复习)

我现在就离开你,等会儿再查。我用java做了这个例子,因为我对它比较熟悉。在本例中,我不使用subscribe,而是使用assign。首先检索主题分区,我们设置从中读取消息的开始日期时间,然后为每个分区创建一个指定该日期时间的Map。
通过创建的Map,我们可以使用offsetsfortimes方法获得每个分区在指定日期时间的偏移量。对于每个分区的偏移量,我们使用seek移动到每个分区上的偏移量,最后使用消息。
我现在没有时间检查代码,但我会做的。希望对你有帮助。

AdminClient client = AdminClient.create(getAdminClientProperties());
        KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(
                getConsumerProperties());

        String TOPIC = "topic";

        // get info of all partitions of a topic
        List<PartitionInfo> partitionsInfo = consumer.partitionsFor(TOPIC);

        // create TopicPartition list
        Set<TopicPartition> partitions = new HashSet<>();
        for (PartitionInfo p : partitionsInfo) {
            partitions.add(new TopicPartition(p.topic(), p.partition()));
        }

        // Consumer will read from all partitions
        consumer.assign(partitions);
        DateTime timeToStartReadMessagesFrom = new DateTime(2020, 3, 1, 0, 0, 0);

        Map<TopicPartition, Long> timestamps = new HashMap<>();
        for (TopicPartition tp : partitions) {
            timestamps.put(tp, timeToStartReadMessagesFrom.getMillis());
        }
        // get the offset for that time in each partition
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
        for (TopicPartition tp : partitions) {
            consumer.seek(tp, offsets.get(tp).offset());
        }

        while (true) {
            final ConsumerRecords<String, GenericRecord> consumerRecords = consumer.poll(1000);
            // do something
            break;
        }
        consumer.close();
        System.out.println("DONE");
y3bcpkx1

y3bcpkx12#

如果分配给每个分区并说明开始读取的偏移量,就可以这样做。
以下是获取主题分区列表的方式:

public static List<TopicPartition> GetTopicPartitions(string bootstrapServers, string topicValue) {
    var tp = new List<TopicPartition>();
    using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = bootstrapServers }).Build()) {
        var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));
        meta.Topics.ForEach(topic => {
            if (topic.Topic == topicValue) {
                foreach (PartitionMetadata partition in topic.Partitions) {
                    tp.Add(new TopicPartition(topic.Topic, partition.PartitionId));
                }
            }
        });
    }
    return tp;
}

这就是如何找到特定时间的偏移量:

List<TopicPartition> topic_partitions = frmMain.GetTopicPartitions(mBootstrapServers, txtTopic.Text);

using (var consumer = new ConsumerBuilder<Ignore, string>(cfg).Build()) {
    consumer.Assign(topic_partitions);

    List<TopicPartitionTimestamp> new_times = new List<TopicPartitionTimestamp>();
    foreach (TopicPartition tp in topic_partitions) {
        new_times.Add(new TopicPartitionTimestamp(tp, new Timestamp(dtpNewTime.Value)));
    }

    List<TopicPartitionOffset> seeked_offsets = consumer.OffsetsForTimes(new_times, TimeSpan.FromSeconds(40));
    string s = "";
    foreach (TopicPartitionOffset tpo in seeked_offsets) {
        s += $"{tpo.TopicPartition}: {tpo.Offset.Value}\n";
    }
    Console.WriteLine(s);
    consumer.Close();
}

这是通过分配给所有主题分区和特定偏移量来使用的方式:

using (var consumer =
    new ConsumerBuilder<string, string>(config)
        .SetErrorHandler((_, e) => Log($"Error: {e.Reason}"))
        .Build()) {
    consumer.Assign(seeked_offsets);

    try {
        while (true) {
            try {
                var r = consumer.Consume(cancellationToken);
                // do something with r
            } catch (ConsumeException e) {
                //Log($"Consume error: {e.Error.Reason}");
            }
        }
    } catch (OperationCanceledException) {
        //Log("Closing consumer.");
        consumer.Close();
    }
}

如果您坚持要将此应用于使用者组,则另一种选择是重置使用者组并使用您的代码,或者创建新的使用者组。

相关问题