apache-kafka Kafka的消费--分配与寻找

2ledvvac  于 2022-11-01  发布在  Apache
关注(0)|答案(2)|浏览(206)

我无法理解这个API设计!
在下面的代码中,我们订阅了一个主题列表,并动态分配了分区。

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    consumer.subscribe(Arrays.asList("some-topic"));

    while(true){

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        StreamSupport.stream(records.spliterator(), false)
                    .forEach(r -> {
                        System.out.println(r.key() + "::" + r.value());
                    });

    }

困惑就在这里。

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
    //seek for specific partition
    TopicPartition partition = new TopicPartition("some-topic", 0);
    consumer.assign(Arrays.asList(partition));
    consumer.seek(partition, 0);

    while(true){

        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        StreamSupport.stream(records.spliterator(), false)
                .forEach(r -> {
                    System.out.println(r.key() + "::" + r.value());
                });

    }

问题:

1.我们已经使用assign方法分配了分区列表。为什么seek方法还查找分区信息?不知何故,我觉得它是冗余

  1. seek方法具有带主题和偏移量的分区。为什么它需要先使用assign才能调用seek
t9aqgxwy

t9aqgxwy1#

1 -首先要记住你的消费者可以通过kafka API被分配到许多不同的主题/分区。然后,seek和assign有两个不同的独立责任,这就是为什么你可能认为它是多余的,但是无论何时你需要回到一个偏移,或者出于什么原因你需要重做一个偏移,你都要使用seek,为此,seek()需要主题和分区信息,您使用了静态分配(分配)或动态分配(订阅)。
您不能只使用seek()而不指定topic)/ partition,在许多情况下,它是不明确的。
2 -你确定你需要在调用seek()之前做assign吗?我知道这两个都可以在调用poll()之前使用..,但不知道assign在seek()之前是强制性的..,你有错误消息吗(我明天可能会检查一下,然后编辑这篇文章)
扬尼克

gupuwyp2

gupuwyp22#

简短的回答--没有必要总是在“assign”之后调用“seek”。
长长的回答-

consumer.subscribe(Arrays.asList("some-topic")); and 
consumer.assign(Arrays.asList(partition));

做类似的工作,除了一个细节-
“subscribe”会将主题分配给属于某个使用者组的使用者。这会处理将来的任何重新平衡,即,如果将新分区添加到主题,使用者组将自动调整其使用者,以从新分区使用。
“assign”赋予调用方代码更多的权力和责任。它不会进行任何重新平衡,调用方负责提交偏移量。只有当您希望从特定偏移量开始使用时才调用“seek”。在这种情况下,它是偏移量“0”,与初始赋值相同,因此使其成为冗余。seek用于转到特定偏移量

相关问题