有没有办法从Kafka的主题中得到最后的信息?

ruarlubt  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(667)

我有一个带有多个分区的kafka主题,我想知道java中是否有方法获取该主题的最后一条消息。我不在乎分区我只想得到最新的消息。
我试过了 @KafkaListener 但它只在主题更新时获取消息。如果在应用程序打开后没有发布任何内容,则不会返回任何内容。
也许听众根本不是解决问题的正确方法?

ppcbkaq5

ppcbkaq51#

下面这个片段对我很有用。你可以试试这个。评论中的解释。

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        consumer.poll(Duration.ofSeconds(10));

        consumer.assignment().forEach(System.out::println);

        AtomicLong maxTimestamp = new AtomicLong();
        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();

        // get the last offsets for each partition
        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {
            System.out.println("offset: "+offset);

            // seek to the last offset of each partition
            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);

            // poll to get the last record in each partition
            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {

                // the latest record in the 'topic' is the one with the highest timestamp
                if (record.timestamp() > maxTimestamp.get()) {
                    maxTimestamp.set(record.timestamp());
                    latestRecord.set(record);
                }
            });
        });
        System.out.println(latestRecord.get());
am46iovg

am46iovg2#

您必须使用来自每个分区的最新消息,然后在客户端进行比较(如果消息包含时间戳,则使用消息上的时间戳)。原因是Kafka不保证分区间的有序性。在分区内,可以确保偏移量最大的消息是推送到它的最新消息。

相关问题